import redis import logging from asyncua import Client from datetime import datetime from get_config import get_conf # 连接redis服务 def get_redis_conn(): # 获取配置 config = get_conf() # 创建redis对象 try: redis_conf = config['redis_conf'] redis_ip = redis_conf['ip'] pool = redis.ConnectionPool(host=redis_ip, password='') redis_conn = redis.Redis(connection_pool=pool) except BaseException as e: print(e) print("异常,redis连接错误!") return redis_conn # 根据服务、和点位获取OPC数据 async def get_opc_data(sys_url, point_arr, ns=2): if sys_url is None or point_arr is None: return # 获取配置 # config = get_conf() # 获取redis redis_conn = get_redis_conn() # 连接opc服务 try: print("正在连接OPCUA服务...") async with Client(url=sys_url) as client: i = 0 result_arr = {} print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f')) while i < len(point_arr): try: node = f"ns={ns};s={point_arr[i]}" tag = client.get_node(node) value = await tag.read_value() # redis值 redis_conn.set(str(node), str(value)) arr_key = node.split("=") result_arr[arr_key[2]] = value finally: i += 1 continue print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f')) return result_arr except KeyError as e: # logging.warning(e.args) logging.warning("服务连接错误!") return "服务连接错误!" except BaseException as e: print(e) return "服务连接错误!" # 将点位值赋予json(递归,适配多级json) def point_data_to_arr(point_data, data_arr=[]): # 未获取到值返回空数组 if len(data_arr) == 0: return point_data point_dict = {} if isinstance(point_data, dict): for key, val in point_data.items(): data = point_data[key] if 'val' == key: point_key = point_dict['key'] if point_key in data_arr: point_dict['val'] = data_arr[point_key] else: point_dict['val'] = "" elif isinstance(val, str): point_dict[key] = point_data[key] else: test = point_data_to_arr(data, data_arr) point_dict[key] = test elif isinstance(point_data, list): point_arr = [] for i in range(len(point_data)): data = point_data[i] data_dic = point_data_to_arr(data, data_arr) point_arr.append(data_dic) # point_arr[i] = return point_arr return point_dict