jf_pump_1125m.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import os
  2. from sys import path
  3. currentPath = os.getcwd().replace('\\','/') # 获取当前路径
  4. path.append(currentPath)
  5. import paho.mqtt.client as mqtt
  6. import mqtt_client as mc
  7. import config_info_model as md
  8. import redis
  9. import time, datetime
  10. import json
  11. class jf_pump_1125m():
  12. sys_key = 'jinfeng'
  13. sys_name = 'pump_1125m'
  14. point_dict = {}
  15. jdata = None
  16. cf = md.configInfoModel(sys_key, sys_name)
  17. client = None
  18. def __init__(self):
  19. self.init_mqtt()
  20. def init_mqtt(self):
  21. # print(self.cf.ip)
  22. self.client = mqtt.Client(self.cf.ip)
  23. self.client.username_pw_set(self.cf.username, password=self.cf.password)
  24. # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
  25. self.client.on_message = self.on_message #接收到订阅消息时的回调
  26. self.subscribe_update_topic()
  27. def subscribe_update_topic(self):
  28. self.client.connect(self.cf.ip, 1883, 600) #连接到broker
  29. self.client.subscribe("1/1/9", 0)
  30. self.client.loop_forever()
  31. # 回调数据
  32. def on_message(self, client, userdata, msg):
  33. data = msg.payload
  34. jsonStr = data.decode('utf-8')
  35. jsonObj = json.loads(jsonStr)
  36. data = jsonObj['values']
  37. data_dict = {}
  38. for i in range(len(data)):
  39. v = data[i]
  40. data_dict[v['code']] = v['v']
  41. self.point_dict = data_dict
  42. self.run()
  43. timeStamp = jsonObj['timestamp'] / 1000
  44. times = datetime.datetime.fromtimestamp(timeStamp)
  45. print(times)
  46. # 获取点位
  47. def get_base_data(self):
  48. # 读取最终json格式数据
  49. self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
  50. jdata_s = self.jdata['sys_point']
  51. return jdata_s
  52. # 数据处理
  53. def data_handle(self, data):
  54. for key, val in data.items():
  55. for i in range(len(val)):
  56. ck = val[i]['key']
  57. if ck in self.point_dict.keys():
  58. val[i]['val'] = self.point_dict[ck]
  59. data[key] = val
  60. self.jdata['sys_point'] = data
  61. # 存入redis
  62. def save_redis(self):
  63. config = self.cf.get_conf()
  64. try:
  65. redis_conf = config['redis_conf']
  66. redis_ip = redis_conf['ip']
  67. pool = redis.ConnectionPool(host=redis_ip, password='')
  68. redis_conn = redis.Redis(connection_pool=pool)
  69. # 接口数据存入redis
  70. redis_key = f"{self.sys_key}_{self.sys_name}"
  71. json_str = json.dumps(self.jdata)
  72. redis_conn.set(str(redis_key), json_str)
  73. except BaseException as e:
  74. print(e)
  75. print("异常,redis连接错误!")
  76. def run(self):
  77. data = self.get_base_data()
  78. self.data_handle(data)
  79. self.save_redis()
  80. if __name__ == "__main__":
  81. obj = jf_pump_1125m()