123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import redis
- import logging
- import asyncio
- import json
- from asyncua import Client
- from datetime import datetime
- from get_config import get_conf, get_opc_point_arr_conf, get_opc_sys_conf, get_opc_point_json_conf
- # 连接redis服务
- def get_redis_conn():
- # 获取配置
- config = get_conf()
- # 创建redis对象
- try:
- redis_conf = config['redis_conf']
- redis_ip = redis_conf['ip']
- pool = redis.ConnectionPool(host=redis_ip, password='')
- redis_conn = redis.Redis(connection_pool=pool)
- except BaseException as e:
- print(e)
- print("异常,redis连接错误!")
- return redis_conn
- # 根据服务、和点位获取OPC数据
- async def get_opc_data(sys_url, point_arr, ns=2):
- if sys_url is None or point_arr is None:
- return
- # 获取配置
- # config = get_conf()
- # 获取redis
- redis_conn = get_redis_conn()
- # 连接opc服务
- try:
- print("正在连接OPCUA服务...")
- async with Client(url=sys_url) as client:
- i = 0
- result_arr = {}
- print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
- while i < len(point_arr):
- try:
- node = f"ns={ns};s={point_arr[i]}"
- tag = client.get_node(node)
- value = await tag.read_value()
- # redis值
- redis_conn.set(str(point_arr[i]), str(value))
- arr_key = node.split("=")
- result_arr[arr_key[2]] = value
- finally:
- i += 1
- continue
- print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
- return result_arr
- except KeyError as e:
- # logging.warning(e.args)
- logging.warning("服务连接错误!")
- return "服务连接错误!"
- except BaseException as e:
- print(e)
- return "服务连接错误!"
- # 将点位值赋予json(递归,适配多级json)
- def point_data_to_arr(point_data, data_arr=[]):
- # 未获取到值返回空数组
- if len(data_arr) == 0:
- return point_data
- point_dict = {}
- if isinstance(point_data, dict):
- for key, val in point_data.items():
- data = point_data[key]
- if 'val' == key:
- point_key = point_dict['key']
- if point_key in data_arr:
- point_dict['val'] = data_arr[point_key]
- else:
- point_dict['val'] = ""
- elif isinstance(val, str):
- point_dict[key] = point_data[key]
- else:
- test = point_data_to_arr(data, data_arr)
- point_dict[key] = test
- elif isinstance(point_data, list):
- point_arr = []
- for i in range(len(point_data)):
- data = point_data[i]
- data_dic = point_data_to_arr(data, data_arr)
- point_arr.append(data_dic)
- # point_arr[i] =
- return point_arr
- return point_dict
- def json_data_to_redis(sys_key, sys_name):
- # 3、开启服务
- # 3.1 获取opc点位arr
- opc_point_arr = get_opc_point_arr_conf(sys_key, sys_name)
- # 3.2 根据点位获取opc数据
- opc_sys_conf = get_opc_sys_conf(sys_key, sys_name)
- opc_data = asyncio.run(get_opc_data(opc_sys_conf.opc_server_url, opc_point_arr))
- # 3.3 获取json格式
- opc_json_data = get_opc_point_json_conf(sys_key, sys_name)
- if opc_data is None or isinstance(opc_data, str):
- msg = "数据读取错误!请检查点位或服务配置!"
- tips = '{"code":-1, "msg":"' + msg + '"}'
- return msg
- else:
- # 将数据结合到json格式
- json_data = point_data_to_arr(opc_json_data, opc_data)
- # 获取redis
- redis_conn = get_redis_conn()
- # 接口数据存入redis
- redis_key = f"{sys_key}_{sys_name}"
- json_str = json.dumps(json_data)
- redis_conn.set(str(redis_key), json_str)
|