jf_power_center.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import os
  2. from sys import path
  3. currentPath = os.getcwd().replace('\\','/') # 获取当前路径
  4. path.append(currentPath)
  5. import paho.mqtt.client as mqtt
  6. import mqtt_client as mc
  7. import config_info_model as md
  8. import redis
  9. import time, datetime
  10. import json
  11. class jf_power_center():
  12. sys_key = 'jinfeng'
  13. sys_name = 'power_center'
  14. sys_path = '1/1/1'
  15. point_dict = {}
  16. jdata = None
  17. cf = md.configInfoModel(sys_key, sys_name)
  18. client = None
  19. def __init__(self):
  20. self.init_mqtt()
  21. def init_mqtt(self):
  22. # print(self.cf.ip)
  23. self.client = mqtt.Client(f"{self.sys_key}_{self.sys_name}")
  24. self.client.username_pw_set(self.cf.username, password=self.cf.password)
  25. # self.client.on_connect = self.on_connect #连接broker时broker响应的回调
  26. self.client.on_message = self.on_message #接收到订阅消息时的回调
  27. self.subscribe_update_topic()
  28. def subscribe_update_topic(self):
  29. self.client.connect(self.cf.ip, 1883, 600) #连接到broker
  30. self.client.subscribe(self.sys_path, 0)
  31. self.client.loop_forever()
  32. # 回调数据
  33. def on_message(self, client, userdata, msg):
  34. data = msg.payload
  35. jsonStr = data.decode('utf-8')
  36. jsonObj = json.loads(jsonStr)
  37. data = jsonObj['values']
  38. data_dict = {}
  39. for i in range(len(data)):
  40. v = data[i]
  41. data_dict[v['code']] = v['v']
  42. self.point_dict = data_dict
  43. self.run()
  44. timeStamp = jsonObj['timestamp'] / 1000
  45. times = datetime.datetime.fromtimestamp(timeStamp)
  46. print(times)
  47. # 获取点位
  48. def get_base_data(self):
  49. # 读取最终json格式数据
  50. self.jdata = self.cf.get_json(self.sys_key, self.sys_name)
  51. jdata_s = self.jdata['sys_point']
  52. return jdata_s
  53. # 数据处理
  54. def data_handle(self, data):
  55. switch_arr = data['switch']
  56. for i in range(len(switch_arr)):
  57. # 取每一个值的key
  58. sk = switch_arr[i]['key']
  59. # 根据key取值赋值
  60. if sk in self.point_dict.keys():
  61. data['switch'][i]['value'] = self.point_dict[sk]
  62. power_parameter_arr = data['power_parameter_new']
  63. i=1
  64. for i in range(len(power_parameter_arr)):
  65. keys_arr = power_parameter_arr[i]['keys']
  66. j=0
  67. pp_arr = [];
  68. for j in range(len(keys_arr)):
  69. k = keys_arr[j]
  70. v = self.point_dict[k]
  71. pp_arr.append(v)
  72. data['power_parameter_new'][i]['data']=pp_arr
  73. power_parameter = data['power_parameter']
  74. i=0
  75. for i in range(len(power_parameter)):
  76. if 'data' in power_parameter[i].keys():
  77. p = power_parameter[i]['data']
  78. j=0
  79. for j in range(len(p)):
  80. kk = p[j]['key']
  81. if kk in self.point_dict.keys():
  82. data['power_parameter'][i]['data'][j]['value'] = self.point_dict[kk]
  83. # for key, val in data.items():
  84. # for i in range(len(val)):
  85. # ck = val[i]['key']
  86. # if ck in self.point_dict.keys():
  87. # data[key][i]['value'] = self.point_dict[ck]
  88. # if 'data' in val[i].keys():
  89. # p = val[i]['data']
  90. # for j in range(len(p)):
  91. # kk = p[j]['key']
  92. # if kk in self.point_dict.keys():
  93. # data[key][i]['data'][j]['value'] = self.point_dict[kk]
  94. # data[key] = val
  95. self.jdata['sys_point'] = data
  96. # 存入redis
  97. def save_redis(self):
  98. config = self.cf.get_conf()
  99. try:
  100. redis_conf = config['redis_conf']
  101. redis_ip = redis_conf['ip']
  102. pool = redis.ConnectionPool(host=redis_ip, password='')
  103. redis_conn = redis.Redis(connection_pool=pool)
  104. # 接口数据存入redis
  105. redis_key = f"{self.sys_key}_{self.sys_name}"
  106. json_str = json.dumps(self.jdata)
  107. redis_conn.set(str(redis_key), json_str)
  108. except BaseException as e:
  109. print(e)
  110. print("异常,redis连接错误!")
  111. def run(self):
  112. data = self.get_base_data()
  113. self.data_handle(data)
  114. self.save_redis()
  115. if __name__ == "__main__":
  116. obj = jf_power_center()