欢迎常来串门~!

MENU

“大周”的物联网DIY--家庭温湿度监控(二)

March 15, 2019 • Read: 654 • IOT

书接上回,今天来说下监控的软件部分。

4.软件篇

软件开发主要涉及到两部分,一个是TYPBOARD主板的开发,另外一部分为APP_server应用服务器的开发。
本监控所有涉及到的开发软件如下

开发语言及软件作用
Micropython控制传感器并上传数据
Flask编写服务器端web应用程序,负责处理接收到的数据
Mysql数据本地化,存储所有监控数据
Redis本地数据库的缓存,加快数据的存储和读取
Nginx网站代理,用于转发所有服务器请求
CloudMQTTMQTT服务代理,推送转发MQTT协议请求
Grafana数据可视化工具
IoT MQTT PanelMQTT的手机APP(暂只支持Android系统)

花开两头,各表一支。我们从传感器开发说起。

4.1 TYPBOARD主板的开发

主板中的应用结构如下图

图中的boot.py和main.py为系统要求的文件,不可改名。
该程序的功能就是读取DHT11传感器的气温湿度数据,然后通过wifi网络,使用MQTT协议向MQTT BROKER发布相关数据。

主板程序请从我的GitHub下载

(1)boot.py为启动文件
microPython使用该文件确定启动方式,类似于系统引导文件。在这里我们把wifi的连接写在boot.py文件里,主板通电后,能立即连接网络。
这里我们通过配置文件读取wifi名和密码,如果wifi连接失败,会提示手动添加wifi名和密码(该功能主要是调试时用)
下面为boot.py中wifi连接核心代码:

#以下为正常的WIFI连接流程
import network
import sys

wifi = network.WLAN(network.STA_IF)
if not wifi.isconnected():
    print('connecting to network...')
    wifi.active(True)
    wifi.connect(config['essid'], config['password'])
    import utime

    for i in range(200):
        print('第{}次尝试连接WIFI热点'.format(i))
        if wifi.isconnected():
            break
        utime.sleep_ms(100) #一般5-10秒,应该绰绰有余

    if not wifi.isconnected():
        wifi.active(False) #关掉连接,免得repl死循环输出
        print('wifi connection error, please reconnect')
        import os
        # 连续输错essid和password会导致wifi_config.json不存在
        try:
            os.remove('wifi_config.json') # 删除配置文件
        except:
            pass
        do_connect() # 重新连接
    else:
        print('network config:', wifi.ifconfig())

(2)main.py为主程序文件
micropython启动时,会先运行boot.py,然后紧接着运行main.py,主要代码都需要写在main.py这个文件里。
main.py通过无限循环的方式定时读取传感器数据,并把数据打包成json格式。
main.py里的核心代码如下:

try:
  while (1): 
    (year, month, mday, hour, minute, second, weekday, yearday)=utime.localtime()
    data_time='{}{:0>02d}{:0>02d}{:0>02d}{:0>02d}{:0>02d}'.format(year,month,mday,hour,minute,second)
    print (year,'-','%02d'%month,'-','%02d'%mday, ' ','%2d'% hour,':','%02d'% minute,':','%02d'% second,sep = '')
    dht.measure()
    data_temperature=dht.temperature() 
    data_humidity=dht.humidity() 
    data_dic={'value':msg,'time':data_time}
    result_str=pubdata(c_mqtt,data_dic)
    print(result_str)
    sleep(60)
finally:
  c_mqtt.disconnect()
  Pin(2,Pin.OUT,value=1)

(3)mqtt.py文件
mqtt.py文件主要负责和MQTT服务器通讯,并发送数据。
c_mqtt.publish('weather',j_d) 这里的weather为MQTT中“PUBLISH”中自定义的topic名字,我们可以按照需要自行修改。如要订阅该topic的数据,服务器端的“SUBSCRIBE”只使用相同的topic名字就可以了。
下面为mqtt.py的核心代码。

from simple import MQTTClient
import json

def pubdata(c_mqtt,data):
    j_d=json.dumps(data)
    print(j_d)
    c_mqtt.publish('weather',j_d)
    return "ok"
    
def connect(client_id,server,port,username,password):
    c_mqtt = MQTTClient(client_id, server,port,username,password)
    c_mqtt.connect()
    print("Connected to %s" % (server))
    return c_mqtt

(4)wifi_config.json配文件
该文件为连接wifi的配置文件,以json格式保存数据。
essid保存wifi名,password保存wifi密码。

{
    "essid": "xxxx", 
    "password": "xxx"
}

(5)lib文件夹
micropython中这个lib是必须存在的,
lib这个文件夹里存放的是程序使用的相关类。

4.2 API_Server端开发

由于API_Server端代码,暂时只是演示代码。并未完善日志,错误判断等功能。
API_Server主要作用为订阅MQTT BORKER的数据。并把数据转化为相应的json格式后保存在本地MySQL数据库。
应用结构如下图:

(1)iothub.py为主程序
iothub.py为API_Server端的主程序,通过运行该文件,启动程序。
iothub.py主要负责启动多进程,并订阅MQTT BORKER推行的相关topic。
mqtt_client为自定义函数,主要负责连接处理与MQTT BROKER的通讯,并处理相关业务逻辑。
该函数有四个参数,
web_url:MQTT BROKER的ip地址或是url,需按实际修改
port_num:MQTT BROKER的端口,需按实际修改
uesrname: MQTT BROKER的用户名,需按实际修改
password:MQTT BROKER的密码,需按实际修改
data_process:为一个自定义的回调函数,用于数据本地化存储。

import time
from multiprocessing import Process
from DBAPI.DATABASE import mqtt_client
from PROCEDURES.LOGIC import data_process

if __name__ == '__main__':
    my_mqtt=mqtt_client("web_url", port_num,"username", "password",data_process)
    #启动一个后台进程用于执行mqtt-client
    p=Process(target=my_mqtt.connect)
    p.start()
    print("process started")
    while 1:
        time.sleep(1)
        print(time.ctime(time.time()))

(2)LOGIC.py
该文件包含了Python的paho模块用于处理业务逻辑的回调函数,本项目里主要是把接受到的json数据转化为相对应的sql语句,并插入数据库。
下面的代码中需要注意的是inst_mysql=db_mysql('dbusername','dbpassword','dbip','dbname') ,该函数用于连接数据库,其中四个参数分别是:
dbusername:数据库用户名,需按实际修改
dbpassword:数据库密码,需按实际修改
dbip:数据库IP地址,需按实际修改
dbname: 数据库名字,需按实际修改

from DBAPI.DATABASE import db_mysql
from PROCEDURES.DATAOP import json_to_sql
import json

def  __data_to_db(data_dict,tab_name,op_flag):
    sql_str=json_to_sql(data_dict,tab_name,op_flag)
    print(sql_str)
    ##连接mysql数据库
    inst_mysql=db_mysql('dbusername','dbpassword','dbip','dbname')
    print("ok")
    with inst_mysql.connect() as q:
        try:
            q.execute(sql_str)
            print("operation on table {} is completed! ".format(tab_name))
            return 0
        except Exception as e:
            print(str(e))
            return 1

##该函数用于mqtt-clent里的on_message里的回调
def data_process(msg):
    '''
    msg为MQTT订阅返回的 MQTTMessage
    '''
    payload_json=msg.payload
    payload_dict=json.loads(payload_json)
    data_dict={}
    data_dict['topic_name']=msg.topic
    data_dict['data_value']=payload_dict['value']
    data_dict['data_time']=payload_dict['time']
    __data_to_db(data_dict,'sensor_data','insert')

(3)DATAOP.py
该文件存放json转化为sql的逻辑代码。

def json_to_sql(data_dict,tab_name,op_flag):
    '''
    json数据转转成sql语句
    json: json格式的数据源
    tab_name: 操作的表名
    op_flag:"insert" 为insert操作,
    '''
    len_dict=len(data_dict)
    print(data_dict)
    if op_flag=="insert":
        column_name=""
        data_value=""
        n=1  #用于判读数据字典
        
        for key in data_dict.keys():
            if n==len_dict:
                column_name += key
                data_value  += "\""+data_dict[key]+"\""
                n += 1
            else:
                column_name += key+","
                data_value  += "\""+str(data_dict[key])+"\","
                n += 1

(4)DATABASE.py
所有的外部连接代码存放在该文件里,这里包含了数据库连接,MQTT BROKER的连接。
需要额外注意的是
client.subscribe([("weather",1),("test",1)]
这里的weather和test其实就是我们前面所指的topic,我们可以按照实际情况修改。
后面的数字“1”为MQTT的Qos(Quality of Service),完整的Qos如下:
0:MQTT BROKER 只发一次包,是否收到完全不管,适合那些不是很重要的数据。
1:当client没收到service的puback或者service没有收到client的puback,那么就会一直发送publisher
只一次。
2:对于1而言,2可以实现仅仅接受一次message。

def connect(self):
    def on_connect(client, userdata, flags, rc):
        print("Connected with result code "+str(rc))
        client.subscribe([("weather",1),("test",1)])
    
    def on_message(client, userdata, msg):
        print(str(msg.timestamp))
        mythread=threading.Thread(target=self.func,args=(msg,))
        mythread.start()
    
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.username_pw_set(self.username, self.password) 
    client.connect(self.host,self.port, 60)
    client.loop_forever()

表面为数据库表结构的创建语句:

保存传感器的相关信息表

create table sensor_info
(
    id              int  UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
    topic_name        varchar(100),
    sensor_location                    varchar(100)
)

保存传感器数据表

create table sensor_data
(
    id              int  UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
    topic_name        varchar(100),
    data_value          varchar(100),
    data_time                varchar(14),
    received_time        varchar(14)
)

服务端代码请从我的GitHub下载

至此传感和服务器的应用程序已搭建完毕,
关于如何配置grafana 和 Iot MQTT Plane 以后有机会再开贴了。
如有问题欢迎随时与我联系。
邮箱:service@kgkzone.com

欢迎关注公众号:大周小周小小周

Tags: None
Archives QR Code
QR Code for this page
Tipping QR Code