import paho.mqtt.client as mqtt import json import redis import time, datetime class EstimationWeightUpdate(object): jdata = None client = None sys_path = '1/1/#' redis_ip = '127.0.0.1' mqtt_ip = '10.71.146.182' mqtt_us = 'admin' mqtt_pd = 'longruan123' def __init__(self): self.init_mqtt() # The callback for when the client receives a CONNACK response from the server. # def on_connect(self, client, userdata, flags, rc): # print("Connected with result code " + str(rc)) # rc的值很重要,为0代表连接成功。 # # # Subscribing in on_connect() means that if we lose the connection and # # reconnect then subscriptions will be renewed. # self.client.subscribe("$SYS/#") # 订阅$SYS/下的所有主题 # The callback for when a PUBLISH message is received from the server. def on_message(self, client, userdata, msg): # print(msg.topic+" "+str(msg.payload)) data = msg.payload json_str = data.decode('utf-8') json_obj = json.loads(jsonStr) data = json_obj['values'] redis_ip = '127.0.0.1' pool = redis.ConnectionPool(host=redis_ip, password='', db=1) redis_conn = redis.Redis(connection_pool=pool) for i in range(len(data)): redis_key = f'{data[i]["code"]}' redis_val = f'{data[i]["v"]}' redis_conn.set(redis_key, redis_val) time_stamp = json_obj['timestamp'] / 1000 times = datetime.datetime.fromtimestamp(time_stamp) print(times) def init_mqtt(self): self.client = mqtt.Client("ANING-SUB") self.client.username_pw_set(self.mqtt_us, password=self.mqtt_pd) # self.client.on_connect = self.on_connect # 连接broker时broker响应的回调 self.client.on_message = self.on_message # 接收到订阅消息时的回调 def subscribe_update_topic(self): self.client.connect("10.71.146.182", 1883, 60) # 连接到broker self.client.subscribe("1/1/#", 0) self.client.loop_forever() if __name__ == "__main__": ew = EstimationWeightUpdate() ew.subscribe_update_topic()