get_opc_data.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import redis
  2. import logging
  3. import asyncio
  4. import json
  5. from asyncua import Client
  6. from datetime import datetime
  7. from get_config import get_conf, get_opc_point_arr_conf, get_opc_sys_conf, get_opc_point_json_conf
  8. # 连接redis服务
  9. def get_redis_conn():
  10. # 获取配置
  11. config = get_conf()
  12. # 创建redis对象
  13. try:
  14. redis_conf = config['redis_conf']
  15. redis_ip = redis_conf['ip']
  16. pool = redis.ConnectionPool(host=redis_ip, password='')
  17. redis_conn = redis.Redis(connection_pool=pool)
  18. except BaseException as e:
  19. print(e)
  20. print("异常,redis连接错误!")
  21. return redis_conn
  22. # 根据服务、和点位获取OPC数据
  23. async def get_opc_data(sys_url, point_arr, ns=2):
  24. if sys_url is None or point_arr is None:
  25. return
  26. # 获取配置
  27. # config = get_conf()
  28. # 获取redis
  29. redis_conn = get_redis_conn()
  30. # 连接opc服务
  31. try:
  32. print("正在连接OPCUA服务...")
  33. async with Client(url=sys_url) as client:
  34. i = 0
  35. result_arr = {}
  36. print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
  37. while i < len(point_arr):
  38. try:
  39. node = f"ns={ns};s={point_arr[i]}"
  40. tag = client.get_node(node)
  41. value = await tag.read_value()
  42. # redis值
  43. redis_conn.set(str(point_arr[i]), str(value))
  44. arr_key = node.split("=")
  45. result_arr[arr_key[2]] = value
  46. finally:
  47. i += 1
  48. continue
  49. print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
  50. return result_arr
  51. except KeyError as e:
  52. # logging.warning(e.args)
  53. logging.warning("服务连接错误!")
  54. return "服务连接错误!"
  55. except BaseException as e:
  56. print(e)
  57. return "服务连接错误!"
  58. # 将点位值赋予json(递归,适配多级json)
  59. def point_data_to_arr(point_data, data_arr=[]):
  60. # 未获取到值返回空数组
  61. if len(data_arr) == 0:
  62. return point_data
  63. point_dict = {}
  64. if isinstance(point_data, dict):
  65. for key, val in point_data.items():
  66. data = point_data[key]
  67. if 'val' == key:
  68. point_key = point_dict['key']
  69. if point_key in data_arr:
  70. point_dict['val'] = data_arr[point_key]
  71. else:
  72. point_dict['val'] = ""
  73. elif isinstance(val, str):
  74. point_dict[key] = point_data[key]
  75. else:
  76. test = point_data_to_arr(data, data_arr)
  77. point_dict[key] = test
  78. elif isinstance(point_data, list):
  79. point_arr = []
  80. for i in range(len(point_data)):
  81. data = point_data[i]
  82. data_dic = point_data_to_arr(data, data_arr)
  83. point_arr.append(data_dic)
  84. # point_arr[i] =
  85. return point_arr
  86. return point_dict
  87. def json_data_to_redis(sys_key, sys_name):
  88. # 3、开启服务
  89. # 3.1 获取opc点位arr
  90. opc_point_arr = get_opc_point_arr_conf(sys_key, sys_name)
  91. # 3.2 根据点位获取opc数据
  92. opc_sys_conf = get_opc_sys_conf(sys_key, sys_name)
  93. opc_data = asyncio.run(get_opc_data(opc_sys_conf.opc_server_url, opc_point_arr))
  94. # 3.3 获取json格式
  95. opc_json_data = get_opc_point_json_conf(sys_key, sys_name)
  96. if opc_data is None or isinstance(opc_data, str):
  97. msg = "数据读取错误!请检查点位或服务配置!"
  98. tips = '{"code":-1, "msg":"' + msg + '"}'
  99. return msg
  100. else:
  101. # 将数据结合到json格式
  102. json_data = point_data_to_arr(opc_json_data, opc_data)
  103. # 获取redis
  104. redis_conn = get_redis_conn()
  105. # 接口数据存入redis
  106. redis_key = f"{sys_key}_{sys_name}"
  107. json_str = json.dumps(json_data)
  108. redis_conn.set(str(redis_key), json_str)