123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import os
- from sys import path
- currentPath = os.getcwd().replace('\\','/') # 获取当前路径
- path.append(currentPath)
- import paho.mqtt.client as mqtt
- import mqtt_client as mc
- import config_info_model as md
- import redis
- import time, datetime
- import json
- class jf_wind_north():
- sys_key = 'jinfeng'
- sys_name = 'wind_north'
- sys_path = '1/1/5'
- point_dict = {}
- jdata = None
- cf = md.configInfoModel(sys_key, sys_name)
- client = None
-
- def __init__(self):
- self.init_mqtt()
- def init_mqtt(self):
- # print(self.cf.ip)
- self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}")
- self.client.username_pw_set(self.cf.username, password=self.cf.password)
- # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
- self.client.on_message = self.on_message #接收到订阅消息时的回调
- self.subscribe_update_topic()
- def subscribe_update_topic(self):
- self.client.connect(self.cf.ip, 1883, 600) #连接到broker
- self.client.subscribe(self.sys_path, 0)
- self.client.loop_forever()
- # 回调数据
- def on_message(self, client, userdata, msg):
- data = msg.payload
- jsonStr = data.decode('utf-8')
- jsonObj = json.loads(jsonStr)
- data = jsonObj['values']
- data_dict = {}
- for i in range(len(data)):
- v = data[i]
- data_dict[v['code']] = v['v']
- self.point_dict = data_dict
- self.run()
- timeStamp = jsonObj['timestamp'] / 1000
- times = datetime.datetime.fromtimestamp(timeStamp)
- print(times)
- # 获取点位
- def get_base_data(self):
-
- # 读取最终json格式数据
- self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
- jdata_s = self.jdata['sys_point']
- return jdata_s
- # 数据处理
- def data_handle(self, data):
- for i in range(len(data)):
- val = data[i]
- sl = val['run_parameter']
- for j in range(len(sl)):
- ck = sl[j]['key']
- if ck in self.point_dict.keys():
- data[i]['run_parameter'][j]['value'] = self.point_dict[ck]
- ep1 = val['electric_parameter_1']
- for j in range(len(ep1)):
- ck = ep1[j]['key']
- if ck in self.point_dict.keys():
- data[i]['electric_parameter_1'][j]['value'] = self.point_dict[ck]
- ep2 = val['electric_parameter_2']
- for j in range(len(ep2)):
- ck = ep2[j]['key']
- if ck in self.point_dict.keys():
- data[i]['electric_parameter_2'][j]['value'] = self.point_dict[ck]
- self.jdata['sys_point'] = data
- # 存入redis
- def save_redis(self):
- config = self.cf.get_conf()
- try:
- redis_conf = config['redis_conf']
- redis_ip = redis_conf['ip']
- pool = redis.ConnectionPool(host=redis_ip, password='')
- redis_conn = redis.Redis(connection_pool=pool)
- # 接口数据存入redis
- redis_key = f"{self.sys_key}_{self.sys_name}"
- json_str = json.dumps(self.jdata)
- redis_conn.set(str(redis_key), json_str)
- except BaseException as e:
- print(e)
- print("异常,redis连接错误!")
- def run(self):
- data = self.get_base_data()
- self.data_handle(data)
- self.save_redis()
- if __name__ == "__main__":
- obj = jf_wind_north()
|