jf_belt_101.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import os
  2. from sys import path
  3. currentPath = os.getcwd().replace('\\','/') # 获取当前路径
  4. path.append(currentPath)
  5. import paho.mqtt.client as mqtt
  6. import mqtt_client as mc
  7. import config_info_model as md
  8. import redis
  9. import time, datetime
  10. import json
  11. class jf_belt_101():
  12. sys_key = 'jinfeng'
  13. sys_name = 'belt_101.json'
  14. point_dict = {}
  15. jdata = None
  16. cf = md.configInfoModel(sys_key, sys_name)
  17. client = None
  18. def __init__(self):
  19. self.init_mqtt()
  20. def init_mqtt(self):
  21. # print(self.cf.ip)
  22. self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}")
  23. self.client.username_pw_set(self.cf.username, password=self.cf.password)
  24. # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
  25. self.client.on_message = self.on_message #接收到订阅消息时的回调
  26. self.subscribe_update_topic()
  27. def subscribe_update_topic(self):
  28. self.client.connect(self.cf.ip, 1883, 600) #连接到broker
  29. self.client.subscribe("1/1/10", 0)
  30. self.client.loop_forever()
  31. # 回调数据
  32. def on_message(self, client, userdata, msg):
  33. data = msg.payload
  34. jsonStr = data.decode('utf-8')
  35. jsonObj = json.loads(jsonStr)
  36. data = jsonObj['values']
  37. data_dict = {}
  38. for i in range(len(data)):
  39. v = data[i]
  40. data_dict[v['code']] = v['v']
  41. self.point_dict = data_dict
  42. self.run()
  43. timeStamp = jsonObj['timestamp'] / 1000
  44. times = datetime.datetime.fromtimestamp(timeStamp)
  45. print(times)
  46. # 获取点位
  47. def get_base_data(self):
  48. # 读取最终json格式数据
  49. self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
  50. jdata_s = self.jdata['sys_point']
  51. return jdata_s
  52. # 数据处理
  53. def data_handle(self, data):
  54. comm = data['common']
  55. elec = data['electric_parameter']
  56. data['state'] = self.point_dict[data['key']]
  57. for i in range(len(comm)):
  58. ck = comm[i]
  59. k = ck[i]['key']
  60. comm[i]['value'] = self.point_dict[k]
  61. data['common'][i] = ck
  62. for i in range(len(elec)):
  63. vl = elec[i]['value_list']
  64. for j in range(len(vl)):
  65. k = vl[j]['key']
  66. vl[j]['value'] = self.point_dict[k]
  67. data['electric_parameter'][i]['value_list'] = vl
  68. self.jdata['sys_point'] = data
  69. # 存入redis
  70. def save_redis(self):
  71. config = self.cf.get_conf()
  72. try:
  73. redis_conf = config['redis_conf']
  74. redis_ip = redis_conf['ip']
  75. pool = redis.ConnectionPool(host=redis_ip, password='')
  76. redis_conn = redis.Redis(connection_pool=pool)
  77. # 接口数据存入redis
  78. redis_key = f"{self.sys_key}_{self.sys_name}"
  79. json_str = json.dumps(self.jdata)
  80. redis_conn.set(str(redis_key), json_str)
  81. except BaseException as e:
  82. print(e)
  83. print("异常,redis连接错误!")
  84. def run(self):
  85. data = self.get_base_data()
  86. self.data_handle(data)
  87. self.save_redis()
  88. if __name__ == "__main__":
  89. obj = jf_belt_101()