main.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import os
  2. import re
  3. import json
  4. import redis
  5. import logging
  6. import asyncio
  7. from flask import Flask, request, render_template
  8. from asyncua import Client
  9. from datetime import datetime
  10. class SubHandler(object):
  11. r = None
  12. pool = None
  13. config = None
  14. def __init__(self):
  15. self.config = get_conf()
  16. redis_conf = self.config['redis_conf']
  17. ip = redis_conf['ip']
  18. self.pool = redis.ConnectionPool(host=ip, password='')
  19. self.r = redis.Redis(connection_pool=self.pool)
  20. async def datachange_notification(self, node, val, data):
  21. print("Python: New data change event", node, val)
  22. def event_notification(self, event):
  23. print("Python: New event", event)
  24. app = Flask(__name__)
  25. # route
  26. @app.route("/get/", methods=['GET', 'POST'])
  27. def get():
  28. # 矿名key
  29. sys_key = request.values.get('sys_key')
  30. # 自动化系统key
  31. sys_name = request.values.get('sys_name')
  32. # 参数判断
  33. if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
  34. return "参数错误"
  35. # 获取对应配置
  36. base_conf = get_conf()
  37. # 获取opc服务地址
  38. sys_url = base_conf["server_url"]
  39. # 获取配置文件名
  40. sys_file_path = base_conf['sys_conf'][sys_key][sys_name]['path']
  41. sys_file_name = base_conf['sys_conf'][sys_key][sys_name]['file_name']
  42. # 获取点位数组
  43. point_conf = get_conf(sys_file_name, sys_file_path)
  44. # 取到配置
  45. point_base_dic = point_conf['sys_point']
  46. point_arr = []
  47. # 取点位数据
  48. # print(point_base_dic)
  49. for group in point_base_dic:
  50. for item in point_base_dic[group]:
  51. point_arr.append(item['key'])
  52. # 根据服务地址获取Opc数据
  53. data = asyncio.run(get_opc_data(sys_url, point_arr, 2))
  54. # 显示请求地址IP
  55. get_request_ip()
  56. for group in point_base_dic:
  57. for item in point_base_dic[group]:
  58. try:
  59. key = item['key']
  60. point_val = data[key]
  61. item['val'] = point_val
  62. finally:
  63. continue
  64. return point_base_dic
  65. def get_conf(file_name="config.json", file_path="./config"):
  66. path = f"{file_path}/{file_name}"
  67. with open(path, "r", encoding="utf-8") as f:
  68. content = json.load(f)
  69. f.close()
  70. return content
  71. async def get_opc_data(sys_url, point_arr, ns=2):
  72. if sys_url is None or point_arr is None:
  73. return
  74. url = sys_url
  75. try:
  76. async with Client(url=url) as client:
  77. handler = SubHandler()
  78. r = handler.r
  79. i = 0
  80. result_arr = {}
  81. print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
  82. while i < len(point_arr):
  83. node = f"ns={ns};s={point_arr[i]}"
  84. tag = client.get_node(node)
  85. sub = await client.create_subscription(500, handler)
  86. value = None
  87. try:
  88. value = await tag.read_value()
  89. # redis值
  90. r.set(str(node), str(value))
  91. # 获取点位key
  92. arr_key = node.split("=")
  93. result_arr[arr_key[2]] = value
  94. # print(f"tag1 is: {tag} with value {value} ")
  95. i += 1
  96. if i == len(point_arr):
  97. print("Get Data Success")
  98. finally:
  99. i += 1
  100. continue
  101. print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
  102. return result_arr
  103. finally:
  104. # 网络连通性测试
  105. sys_ip = sys_url[10: sys_url.index(":", 10)]
  106. os.system(f'ping {sys_ip} -n 1')
  107. def get_request_ip():
  108. ip = request.remote_addr
  109. logging.debug(ip)
  110. def api_get_error(msg):
  111. return '{"code":-1, "msg":"' + msg + '"}'
  112. if __name__ == '__main__':
  113. app.run()