12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import redis
- import logging
- from asyncua import Client
- from datetime import datetime
- from get_config import get_conf
- # 连接redis服务
- def get_redis_conn():
- # 获取配置
- config = get_conf()
- # 创建redis对象
- try:
- redis_conf = config['redis_conf']
- redis_ip = redis_conf['ip']
- pool = redis.ConnectionPool(host=redis_ip, password='')
- redis_conn = redis.Redis(connection_pool=pool)
- except BaseException as e:
- print(e)
- print("异常,redis连接错误!")
- return redis_conn
- # 根据服务、和点位获取OPC数据
- async def get_opc_data(sys_url, point_arr, ns=2):
- if sys_url is None or point_arr is None:
- return
- # 获取配置
- # config = get_conf()
- # 获取redis
- redis_conn = get_redis_conn()
- # 连接opc服务
- try:
- print("正在连接OPCUA服务...")
- async with Client(url=sys_url) as client:
- i = 0
- result_arr = {}
- print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
- while i < len(point_arr):
- try:
- node = f"ns={ns};s={point_arr[i]}"
- tag = client.get_node(node)
- value = await tag.read_value()
- # redis值
- redis_conn.set(str(point_arr[i]), str(value))
- arr_key = node.split("=")
- result_arr[arr_key[2]] = value
- finally:
- i += 1
- continue
- print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
- return result_arr
- except KeyError as e:
- # logging.warning(e.args)
- logging.warning("服务连接错误!")
- return "服务连接错误!"
- except BaseException as e:
- print(e)
- return "服务连接错误!"
- # 将点位值赋予json(递归,适配多级json)
- def point_data_to_arr(point_data, data_arr=[]):
- # 未获取到值返回空数组
- if len(data_arr) == 0:
- return point_data
- point_dict = {}
- if isinstance(point_data, dict):
- for key, val in point_data.items():
- data = point_data[key]
- if 'val' == key:
- point_key = point_dict['key']
- if point_key in data_arr:
- point_dict['val'] = data_arr[point_key]
- else:
- point_dict['val'] = ""
- elif isinstance(val, str):
- point_dict[key] = point_data[key]
- else:
- test = point_data_to_arr(data, data_arr)
- point_dict[key] = test
- elif isinstance(point_data, list):
- point_arr = []
- for i in range(len(point_data)):
- data = point_data[i]
- data_dic = point_data_to_arr(data, data_arr)
- point_arr.append(data_dic)
- # point_arr[i] =
- return point_arr
- return point_dict
|