import os from sys import path currentPath = os.getcwd().replace('\\','/') # 获取当前路径 path.append(currentPath) import paho.mqtt.client as mqtt import mqtt_client as mc import config_info_model as md import redis import time, datetime import json class jf_power_center(): sys_key = 'jinfeng' sys_name = 'power_center' sys_path = '1/1/1' point_dict = {} jdata = None cf = md.configInfoModel(sys_key, sys_name) client = None def __init__(self): self.init_mqtt() def init_mqtt(self): # print(self.cf.ip) self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}") self.client.username_pw_set(self.cf.username, password=self.cf.password) # self.client.on_connect = self.on_connect #连接broker时broker响应的回调 self.client.on_message = self.on_message #接收到订阅消息时的回调 self.subscribe_update_topic() def subscribe_update_topic(self): self.client.connect(self.cf.ip, 1883, 600) #连接到broker self.client.subscribe(self.sys_path, 0) self.client.loop_forever() # 回调数据 def on_message(self, client, userdata, msg): data = msg.payload jsonStr = data.decode('utf-8') jsonObj = json.loads(jsonStr) data = jsonObj['values'] data_dict = {} for i in range(len(data)): v = data[i] data_dict[v['code']] = v['v'] self.point_dict = data_dict self.run() timeStamp = jsonObj['timestamp'] / 1000 times = datetime.datetime.fromtimestamp(timeStamp) print(times) # 获取点位 def get_base_data(self): # 读取最终json格式数据 self.jdata = self.cf.get_json(self.sys_key, self.sys_name) jdata_s = self.jdata['sys_point'] return jdata_s # 数据处理 def data_handle(self, data): for key, val in data.items(): for i in range(len(val)): ck = val[i]['key'] if ck in self.point_dict.keys(): data[key][i]['value'] = self.point_dict[ck] if 'data' in val[i].keys(): p = val[i]['data'] for j in range(len(p)): kk = p[j]['key'] if kk in self.point_dict.keys(): data[key][i]['data'][j]['value'] = self.point_dict[kk] data[key] = val self.jdata['sys_point'] = data # 存入redis def save_redis(self): config = self.cf.get_conf() try: redis_conf = config['redis_conf'] redis_ip = redis_conf['ip'] pool = redis.ConnectionPool(host=redis_ip, password='') redis_conn = redis.Redis(connection_pool=pool) # 接口数据存入redis redis_key = f"{self.sys_key}_{self.sys_name}" json_str = json.dumps(self.jdata) redis_conn.set(str(redis_key), json_str) except BaseException as e: print(e) print("异常,redis连接错误!") def run(self): data = self.get_base_data() self.data_handle(data) self.save_redis() if __name__ == "__main__": obj = jf_power_center()