import redis import logging import asyncio import json from asyncua import Client from datetime import datetime from get_config import get_conf, get_opc_point_arr_conf, get_opc_sys_conf, get_opc_point_json_conf # 连接redis服务 def get_redis_conn(): # 获取配置 config = get_conf() # 创建redis对象 try: redis_conf = config['redis_conf'] redis_ip = redis_conf['ip'] pool = redis.ConnectionPool(host=redis_ip, password='') redis_conn = redis.Redis(connection_pool=pool) except BaseException as e: print(e) print("异常,redis连接错误!") return redis_conn # 根据服务、和点位获取OPC数据 async def get_opc_data(sys_url, point_arr, ns=2): if sys_url is None or point_arr is None: return # 获取配置 # config = get_conf() # 获取redis redis_conn = get_redis_conn() # 连接opc服务 try: print("正在连接OPCUA服务...") async with Client(url=sys_url) as client: i = 0 result_arr = {} print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f')) while i < len(point_arr): try: node = f"ns={ns};s={point_arr[i]}" tag = client.get_node(node) value = await tag.read_value() # redis值 redis_conn.set(str(point_arr[i]), str(value)) arr_key = node.split("=") result_arr[arr_key[2]] = value finally: i += 1 continue print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f')) return result_arr except KeyError as e: # logging.warning(e.args) logging.warning("服务连接错误!") return "服务连接错误!" except BaseException as e: print(e) return "服务连接错误!" # 将点位值赋予json(递归,适配多级json) def point_data_to_arr(point_data, data_arr=[]): # 未获取到值返回空数组 if len(data_arr) == 0: return point_data point_dict = {} if isinstance(point_data, dict): for key, val in point_data.items(): data = point_data[key] if 'val' == key: point_key = point_dict['key'] if point_key in data_arr: point_dict['val'] = data_arr[point_key] else: point_dict['val'] = "" elif isinstance(val, str): point_dict[key] = point_data[key] else: test = point_data_to_arr(data, data_arr) point_dict[key] = test elif isinstance(point_data, list): point_arr = [] for i in range(len(point_data)): data = point_data[i] data_dic = point_data_to_arr(data, data_arr) point_arr.append(data_dic) # point_arr[i] = return point_arr return point_dict def json_data_to_redis(sys_key, sys_name): # 3、开启服务 # 3.1 获取opc点位arr opc_point_arr = get_opc_point_arr_conf(sys_key, sys_name) # 3.2 根据点位获取opc数据 opc_sys_conf = get_opc_sys_conf(sys_key, sys_name) opc_data = asyncio.run(get_opc_data(opc_sys_conf.opc_server_url, opc_point_arr)) # 3.3 获取json格式 opc_json_data = get_opc_point_json_conf(sys_key, sys_name) if opc_data is None or isinstance(opc_data, str): msg = "数据读取错误!请检查点位或服务配置!" tips = '{"code":-1, "msg":"' + msg + '"}' return msg else: # 将数据结合到json格式 json_data = point_data_to_arr(opc_json_data, opc_data) # 获取redis redis_conn = get_redis_conn() # 接口数据存入redis redis_key = f"{sys_key}_{sys_name}" json_str = json.dumps(json_data) redis_conn.set(str(redis_key), json_str)