12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import os
- from sys import path
- currentPath = os.getcwd().replace('\\','/') # 获取当前路径
- path.append(currentPath)
- import paho.mqtt.client as mqtt
- import mqtt_client as mc
- import config_info_model as md
- import redis
- import time, datetime
- import json
- class jf_pump_18m():
- sys_key = 'jinfeng'
- sys_name = 'pump_18m'
- point_dict = {}
- jdata = None
- cf = md.configInfoModel(sys_key, sys_name)
- client = None
-
- def __init__(self):
- self.init_mqtt()
- def init_mqtt(self):
- # print(self.cf.ip)
- self.client = mqtt.Client(self.cf.ip)
- self.client.username_pw_set(self.cf.username, password=self.cf.password)
- # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
- self.client.on_message = self.on_message #接收到订阅消息时的回调
- self.subscribe_update_topic()
- def subscribe_update_topic(self):
- self.client.connect(self.cf.ip, 1883, 600) #连接到broker
- self.client.subscribe("1/1/11", 0)
- self.client.loop_forever()
- # 回调数据
- def on_message(self, client, userdata, msg):
- data = msg.payload
- jsonStr = data.decode('utf-8')
- jsonObj = json.loads(jsonStr)
- data = jsonObj['values']
- data_dict = {}
- for i in range(len(data)):
- v = data[i]
- data_dict[v['code']] = v['v']
- self.point_dict = data_dict
- self.run()
- timeStamp = jsonObj['timestamp'] / 1000
- times = datetime.datetime.fromtimestamp(timeStamp)
- print(times)
- # 获取点位
- def get_base_data(self):
-
- # 读取最终json格式数据
- self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
- jdata_s = self.jdata['sys_point']
- return jdata_s
- # 数据处理
- def data_handle(self, data):
-
- for key, val in data.items():
- for i in range(len(val)):
- ck = val[i]['key']
- if ck in self.point_dict.keys():
- val[i]['val'] = self.point_dict[ck]
- data[key] = val
- self.jdata['sys_point'] = data
- # 存入redis
- def save_redis(self):
- config = self.cf.get_conf()
- try:
- redis_conf = config['redis_conf']
- redis_ip = redis_conf['ip']
- pool = redis.ConnectionPool(host=redis_ip, password='')
- redis_conn = redis.Redis(connection_pool=pool)
- # 接口数据存入redis
- redis_key = f"{self.sys_key}_{self.sys_name}"
- json_str = json.dumps(self.jdata)
- redis_conn.set(str(redis_key), json_str)
- except BaseException as e:
- print(e)
- print("异常,redis连接错误!")
- def run(self):
- data = self.get_base_data()
- self.data_handle(data)
- self.save_redis()
- if __name__ == "__main__":
- obj = jf_pump_18m()
|