main.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import os
  2. import re
  3. import json
  4. import redis
  5. import logging
  6. import asyncio
  7. from flask import Flask, request, render_template
  8. from asyncua import Client
  9. from datetime import datetime
  10. class SubHandler(object):
  11. r = None
  12. pool = None
  13. config = None
  14. def __init__(self):
  15. try:
  16. self.config = get_conf()
  17. redis_conf = self.config['redis_conf']
  18. ip = redis_conf['ip']
  19. self.pool = redis.ConnectionPool(host=ip, password='')
  20. self.r = redis.Redis(connection_pool=self.pool)
  21. except Exception as e:
  22. raise Exception("redis连接错误!")
  23. async def datachange_notification(self, node, val, data):
  24. print("Python: New data change event", node, val)
  25. def event_notification(self, event):
  26. print("Python: New event", event)
  27. app = Flask(__name__)
  28. # route
  29. @app.route("/get/", methods=['GET', 'POST'])
  30. def get():
  31. # 矿名key
  32. sys_key = request.values.get('sys_key')
  33. # 自动化系统key
  34. sys_name = request.values.get('sys_name')
  35. # 参数判断
  36. if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
  37. return "参数错误"
  38. # 获取程序配置
  39. base_conf = get_conf()
  40. # 获取opc服务地址
  41. sys_url = base_conf['sys_conf'][sys_key][sys_name]['opc_server_url']
  42. # 获取配置文件名
  43. sys_file_path = base_conf['sys_conf'][sys_key][sys_name]['path']
  44. sys_file_name = base_conf['sys_conf'][sys_key][sys_name]['file_name']
  45. # 获取点位数组
  46. point_conf = get_conf(sys_file_name, sys_file_path)
  47. try:
  48. # 取需要监听的点位key数组
  49. point_arr = point_to_arr(point_conf)
  50. # 根据服务地址获取Opc数据
  51. data = asyncio.run(get_opc_data(sys_url, point_arr, 2))
  52. # 根据获取值整理成json
  53. dict_data = point_data_to_arr(point_conf, data)
  54. # 显示请求地址IP
  55. # print(f"接收到请求,来自{get_request_ip()} {sys_key} {sys_name}")
  56. return dict_data
  57. except Exception as e:
  58. raise Exception("点位获取错误!")
  59. # 将点位整理数据,用于获取点位值
  60. def point_to_arr(point_data):
  61. if isinstance(point_data, dict):
  62. point_arr = []
  63. for key, val in point_data.items():
  64. data = point_data[key]
  65. if 'key' in point_data.keys():
  66. return point_data['key']
  67. if isinstance(data, str):
  68. continue
  69. return point_to_arr(data)
  70. elif isinstance(point_data, list):
  71. point_arr = []
  72. for i in range(len(point_data)):
  73. data = point_to_arr(point_data[i])
  74. point_arr.append(data)
  75. return point_arr
  76. else:
  77. return
  78. # 将点位值赋予json(递归,适配多级json)
  79. def point_data_to_arr(point_data, data_arr=[]):
  80. # 未获取到值返回空数组
  81. if len(data_arr) == 0:
  82. return point_data
  83. point_dict = {}
  84. if isinstance(point_data, dict):
  85. for key, val in point_data.items():
  86. data = point_data[key]
  87. if 'val' == key:
  88. point_key = point_dict['key']
  89. if point_key in data_arr:
  90. point_dict['val'] = data_arr[point_key]
  91. else:
  92. point_dict['val'] = ""
  93. elif isinstance(val, str):
  94. point_dict[key] = point_data[key]
  95. else:
  96. test = point_data_to_arr(data, data_arr)
  97. point_dict[key] = test
  98. elif isinstance(point_data, list):
  99. point_arr = []
  100. for i in range(len(point_data)):
  101. data = point_data[i]
  102. data_dic = point_data_to_arr(data, data_arr)
  103. point_arr.append(data_dic)
  104. # point_arr[i] =
  105. return point_arr
  106. return point_dict
  107. # 获取点位值
  108. async def get_opc_data(sys_url, point_arr, ns=2):
  109. if sys_url is None or point_arr is None:
  110. return
  111. url = sys_url
  112. async with Client(url=url) as client:
  113. handler = SubHandler()
  114. r = handler.r
  115. i = 0
  116. result_arr = {}
  117. print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
  118. while i < len(point_arr):
  119. node = f"ns={ns};s={point_arr[i]}"
  120. tag = client.get_node(node)
  121. try:
  122. # 监听点位
  123. await client.create_subscription(500, handler)
  124. except Exception as e:
  125. print(e.args)
  126. raise Exception("设置监听错误!")
  127. try:
  128. value = await tag.read_value()
  129. # redis值
  130. r.set(str(node), str(value))
  131. # 获取点位key
  132. arr_key = node.split("=")
  133. result_arr[arr_key[2]] = value
  134. # print(f"tag1 is: {tag} with value {value} ")
  135. i += 1
  136. if i == len(point_arr):
  137. print("Get Data Success")
  138. finally:
  139. # print(f"node={node} 点位值获取错误")
  140. i += 1
  141. continue
  142. print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
  143. return result_arr
  144. # 获取文件配置
  145. def get_conf(file_name="config.json", file_path="./config"):
  146. path = f"{file_path}/{file_name}"
  147. with open(path, "r", encoding="utf-8") as f:
  148. content = json.load(f)
  149. f.close()
  150. return content
  151. # 获取请求Ip
  152. def get_request_ip():
  153. return request.remote_addr
  154. # 返回错误信息
  155. def api_get_error(msg):
  156. return '{"code":-1, "msg":"' + msg + '"}'
  157. if __name__ == '__main__':
  158. config = get_conf()
  159. port = config['server_port']
  160. app.run(
  161. host='0.0.0.0',
  162. port=config['server_port']
  163. )