|
@@ -0,0 +1,107 @@
|
|
|
+import os
|
|
|
+from sys import path
|
|
|
+currentPath = os.getcwd().replace('\\','/') # 获取当前路径
|
|
|
+path.append(currentPath)
|
|
|
+
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
+import mqtt_client as mc
|
|
|
+import config_info_model as md
|
|
|
+import redis
|
|
|
+import time, datetime
|
|
|
+import json
|
|
|
+
|
|
|
+class jf_belt_101():
|
|
|
+
|
|
|
+ sys_key = 'jinfeng'
|
|
|
+ sys_name = 'belt_101.json'
|
|
|
+ point_dict = {}
|
|
|
+ jdata = None
|
|
|
+ cf = md.configInfoModel(sys_key, sys_name)
|
|
|
+ client = None
|
|
|
+
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.init_mqtt()
|
|
|
+
|
|
|
+ def init_mqtt(self):
|
|
|
+ # print(self.cf.ip)
|
|
|
+ self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}")
|
|
|
+ self.client.username_pw_set(self.cf.username, password=self.cf.password)
|
|
|
+ # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
|
|
|
+ self.client.on_message = self.on_message #接收到订阅消息时的回调
|
|
|
+ self.subscribe_update_topic()
|
|
|
+
|
|
|
+ def subscribe_update_topic(self):
|
|
|
+ self.client.connect(self.cf.ip, 1883, 600) #连接到broker
|
|
|
+ self.client.subscribe("1/1/10", 0)
|
|
|
+ self.client.loop_forever()
|
|
|
+
|
|
|
+ # 回调数据
|
|
|
+ def on_message(self, client, userdata, msg):
|
|
|
+ data = msg.payload
|
|
|
+ jsonStr = data.decode('utf-8')
|
|
|
+ jsonObj = json.loads(jsonStr)
|
|
|
+ data = jsonObj['values']
|
|
|
+ data_dict = {}
|
|
|
+ for i in range(len(data)):
|
|
|
+ v = data[i]
|
|
|
+ data_dict[v['code']] = v['v']
|
|
|
+ self.point_dict = data_dict
|
|
|
+ self.run()
|
|
|
+
|
|
|
+ timeStamp = jsonObj['timestamp'] / 1000
|
|
|
+ times = datetime.datetime.fromtimestamp(timeStamp)
|
|
|
+ print(times)
|
|
|
+
|
|
|
+ # 获取点位
|
|
|
+ def get_base_data(self):
|
|
|
+
|
|
|
+ # 读取最终json格式数据
|
|
|
+ self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
|
|
|
+ jdata_s = self.jdata['sys_point']
|
|
|
+ return jdata_s
|
|
|
+
|
|
|
+ # 数据处理
|
|
|
+ def data_handle(self, data):
|
|
|
+ comm = data['common']
|
|
|
+ elec = data['electric_parameter']
|
|
|
+ data['state'] = self.point_dict[data['key']]
|
|
|
+
|
|
|
+ for i in range(len(comm)):
|
|
|
+ ck = comm[i]
|
|
|
+ k = ck[i]['key']
|
|
|
+ comm[i]['val'] = self.point_dict[k]
|
|
|
+ data['common'][i] = ck
|
|
|
+
|
|
|
+ for i in range(len(elec)):
|
|
|
+ vl = elec[i]['value_list']
|
|
|
+ for j in range(len(vl)):
|
|
|
+ k = vl[j]['key']
|
|
|
+ vl[j]['val'] = self.point_dict[k]
|
|
|
+ data['electric_parameter'][i]['value_list'] = vl
|
|
|
+
|
|
|
+ self.jdata['sys_point'] = data
|
|
|
+
|
|
|
+ # 存入redis
|
|
|
+ def save_redis(self):
|
|
|
+ config = self.cf.get_conf()
|
|
|
+ try:
|
|
|
+ redis_conf = config['redis_conf']
|
|
|
+ redis_ip = redis_conf['ip']
|
|
|
+ pool = redis.ConnectionPool(host=redis_ip, password='')
|
|
|
+ redis_conn = redis.Redis(connection_pool=pool)
|
|
|
+ # 接口数据存入redis
|
|
|
+ redis_key = f"{self.sys_key}_{self.sys_name}"
|
|
|
+ json_str = json.dumps(self.jdata)
|
|
|
+ redis_conn.set(str(redis_key), json_str)
|
|
|
+ except BaseException as e:
|
|
|
+ print(e)
|
|
|
+ print("异常,redis连接错误!")
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ data = self.get_base_data()
|
|
|
+ self.data_handle(data)
|
|
|
+ self.save_redis()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ obj = jf_belt_101()
|