|
@@ -0,0 +1,64 @@
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
+import json
|
|
|
+import redis
|
|
|
+import time, datetime
|
|
|
+
|
|
|
+
|
|
|
+class EstimationWeightUpdate(object):
|
|
|
+ jdata = None
|
|
|
+
|
|
|
+ client = None
|
|
|
+ sys_path = '1/1/#'
|
|
|
+ redis_ip = '127.0.0.1'
|
|
|
+ mqtt_ip = '10.71.146.182'
|
|
|
+ mqtt_us = 'admin'
|
|
|
+ mqtt_pd = 'longruan123'
|
|
|
+
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.init_mqtt()
|
|
|
+
|
|
|
+ # The callback for when the client receives a CONNACK response from the server.
|
|
|
+ # def on_connect(self, client, userdata, flags, rc):
|
|
|
+ # print("Connected with result code " + str(rc)) # rc的值很重要,为0代表连接成功。
|
|
|
+ #
|
|
|
+ # # Subscribing in on_connect() means that if we lose the connection and
|
|
|
+ # # reconnect then subscriptions will be renewed.
|
|
|
+ # self.client.subscribe("$SYS/#") # 订阅$SYS/下的所有主题
|
|
|
+
|
|
|
+ # The callback for when a PUBLISH message is received from the server.
|
|
|
+ def on_message(self, client, userdata, msg):
|
|
|
+ # print(msg.topic+" "+str(msg.payload))
|
|
|
+
|
|
|
+ data = msg.payload
|
|
|
+ json_str = data.decode('utf-8')
|
|
|
+ json_obj = json.loads(jsonStr)
|
|
|
+ data = json_obj['values']
|
|
|
+
|
|
|
+ redis_ip = '127.0.0.1'
|
|
|
+ pool = redis.ConnectionPool(host=redis_ip, password='', db=1)
|
|
|
+ redis_conn = redis.Redis(connection_pool=pool)
|
|
|
+
|
|
|
+ for i in range(len(data)):
|
|
|
+ redis_key = f'{data[i]["code"]}'
|
|
|
+ redis_val = f'{data[i]["v"]}'
|
|
|
+ redis_conn.set(redis_key, redis_val)
|
|
|
+ time_stamp = json_obj['timestamp'] / 1000
|
|
|
+ times = datetime.datetime.fromtimestamp(time_stamp)
|
|
|
+ print(times)
|
|
|
+
|
|
|
+ def init_mqtt(self):
|
|
|
+ self.client = mqtt.Client("ANING-SUB")
|
|
|
+ self.client.username_pw_set(self.mqtt_us, password=self.mqtt_pd)
|
|
|
+ # self.client.on_connect = self.on_connect # 连接broker时broker响应的回调
|
|
|
+ self.client.on_message = self.on_message # 接收到订阅消息时的回调
|
|
|
+
|
|
|
+ def subscribe_update_topic(self):
|
|
|
+ self.client.connect("10.71.146.182", 1883, 60) # 连接到broker
|
|
|
+ self.client.subscribe("1/1/#", 0)
|
|
|
+ self.client.loop_forever()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ ew = EstimationWeightUpdate()
|
|
|
+ ew.subscribe_update_topic()
|