jf_run_mqtt.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import paho.mqtt.client as mqtt
  2. import json
  3. import redis
  4. import time, datetime
  5. class EstimationWeightUpdate(object):
  6. jdata = None
  7. client = None
  8. sys_path = '1/1/#'
  9. redis_ip = '127.0.0.1'
  10. mqtt_ip = '10.71.146.182'
  11. mqtt_us = 'admin'
  12. mqtt_pd = 'longruan123'
  13. def __init__(self):
  14. self.init_mqtt()
  15. # The callback for when the client receives a CONNACK response from the server.
  16. # def on_connect(self, client, userdata, flags, rc):
  17. # print("Connected with result code " + str(rc)) # rc的值很重要,为0代表连接成功。
  18. #
  19. # # Subscribing in on_connect() means that if we lose the connection and
  20. # # reconnect then subscriptions will be renewed.
  21. # self.client.subscribe("$SYS/#") # 订阅$SYS/下的所有主题
  22. # The callback for when a PUBLISH message is received from the server.
  23. def on_message(self, client, userdata, msg):
  24. # print(msg.topic+" "+str(msg.payload))
  25. data = msg.payload
  26. json_str = data.decode('utf-8')
  27. json_obj = json.loads(jsonStr)
  28. data = json_obj['values']
  29. redis_ip = '127.0.0.1'
  30. pool = redis.ConnectionPool(host=redis_ip, password='', db=1)
  31. redis_conn = redis.Redis(connection_pool=pool)
  32. for i in range(len(data)):
  33. redis_key = f'{data[i]["code"]}'
  34. redis_val = f'{data[i]["v"]}'
  35. redis_conn.set(redis_key, redis_val)
  36. time_stamp = json_obj['timestamp'] / 1000
  37. times = datetime.datetime.fromtimestamp(time_stamp)
  38. print(times)
  39. def init_mqtt(self):
  40. self.client = mqtt.Client("ANING-SUB")
  41. self.client.username_pw_set(self.mqtt_us, password=self.mqtt_pd)
  42. # self.client.on_connect = self.on_connect # 连接broker时broker响应的回调
  43. self.client.on_message = self.on_message # 接收到订阅消息时的回调
  44. def subscribe_update_topic(self):
  45. self.client.connect("10.71.146.182", 1883, 60) # 连接到broker
  46. self.client.subscribe("1/1/#", 0)
  47. self.client.loop_forever()
  48. if __name__ == "__main__":
  49. ew = EstimationWeightUpdate()
  50. ew.subscribe_update_topic()