123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import os
- from sys import path
- currentPath = os.getcwd().replace('\\','/') # 获取当前路径
- path.append(currentPath)
- import paho.mqtt.client as mqtt
- import mqtt_client as mc
- import config_info_model as md
- import redis
- import time, datetime
- import json
- class jf_power_center():
- sys_key = 'jinfeng'
- sys_name = 'power_center'
- sys_path = '1/1/1'
- point_dict = {}
- jdata = None
- cf = md.configInfoModel(sys_key, sys_name)
- client = None
-
- def __init__(self):
- self.init_mqtt()
- def init_mqtt(self):
- # print(self.cf.ip)
- self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}")
- self.client.username_pw_set(self.cf.username, password=self.cf.password)
- # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
- self.client.on_message = self.on_message #接收到订阅消息时的回调
- self.subscribe_update_topic()
- def subscribe_update_topic(self):
- self.client.connect(self.cf.ip, 1883, 600) #连接到broker
- self.client.subscribe(self.sys_path, 0)
- self.client.loop_forever()
- # 回调数据
- def on_message(self, client, userdata, msg):
- data = msg.payload
- jsonStr = data.decode('utf-8')
- jsonObj = json.loads(jsonStr)
- data = jsonObj['values']
- data_dict = {}
- for i in range(len(data)):
- v = data[i]
- data_dict[v['code']] = v['v']
- self.point_dict = data_dict
- self.run()
- timeStamp = jsonObj['timestamp'] / 1000
- times = datetime.datetime.fromtimestamp(timeStamp)
- print(times)
- # 获取点位
- def get_base_data(self):
-
- # 读取最终json格式数据
- self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
- jdata_s = self.jdata['sys_point']
- return jdata_s
- # 数据处理
- def data_handle(self, data):
- switch_arr = data['switch']
- for i in range(len(switch_arr)):
- # 取每一个值的key
- sk = switch_arr[i]['key']
- # 根据key取值赋值
- if sk in self.point_dict.keys():
- data['switch'][i]['value'] = self.point_dict[sk]
- power_parameter_arr = data['power_parameter_new']
- pp_arr = [];
- i=0
- for i in range(len(power_parameter_arr)):
- keys_arr = power_parameter_arr[i]['keys']
- j=0
- for j in range(len(keys_arr)):
- k = keys_arr[j]
- v = self.point_dict[k]
- pp_arr.append(v)
- data['power_parameter_new'][i]['data']=pp_arr
- power_parameter = data['power_parameter']
- i=0
- for i in range(len(power_parameter)):
- if 'data' in power_parameter[i].keys():
- p = power_parameter[i]['data']
- j=0
- for j in range(len(p)):
- kk = p[j]['key']
- if kk in self.point_dict.keys():
- data['power_parameter'][i]['data'][j]['value'] = self.point_dict[kk]
- # for key, val in data.items():
- # for i in range(len(val)):
- # ck = val[i]['key']
- # if ck in self.point_dict.keys():
- # data[key][i]['value'] = self.point_dict[ck]
- # if 'data' in val[i].keys():
- # p = val[i]['data']
- # for j in range(len(p)):
- # kk = p[j]['key']
- # if kk in self.point_dict.keys():
- # data[key][i]['data'][j]['value'] = self.point_dict[kk]
- # data[key] = val
- self.jdata['sys_point'] = data
- # 存入redis
- def save_redis(self):
- config = self.cf.get_conf()
- try:
- redis_conf = config['redis_conf']
- redis_ip = redis_conf['ip']
- pool = redis.ConnectionPool(host=redis_ip, password='')
- redis_conn = redis.Redis(connection_pool=pool)
- # 接口数据存入redis
- redis_key = f"{self.sys_key}_{self.sys_name}"
- json_str = json.dumps(self.jdata)
- redis_conn.set(str(redis_key), json_str)
- except BaseException as e:
- print(e)
- print("异常,redis连接错误!")
- def run(self):
- data = self.get_base_data()
- self.data_handle(data)
- self.save_redis()
- if __name__ == "__main__":
- obj = jf_power_center()
|