import os import re import json import redis import logging import asyncio from flask import Flask, request, render_template from asyncua import Client from datetime import datetime class SubHandler(object): r = None pool = None config = None def __init__(self): self.config = get_conf() redis_conf = self.config['redis_conf'] ip = redis_conf['ip'] self.pool = redis.ConnectionPool(host=ip, password='') self.r = redis.Redis(connection_pool=self.pool) async def datachange_notification(self, node, val, data): print("Python: New data change event", node, val) def event_notification(self, event): print("Python: New event", event) app = Flask(__name__) # route @app.route("/get/", methods=['GET', 'POST']) def get(): # 矿名key sys_key = request.values.get('sys_key') # 自动化系统key sys_name = request.values.get('sys_name') # 参数判断 if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0: return "参数错误" # 获取对应配置 base_conf = get_conf() # 获取opc服务地址 sys_url = base_conf["server_url"] # 获取配置文件名 sys_file_path = base_conf['sys_conf'][sys_key][sys_name]['path'] sys_file_name = base_conf['sys_conf'][sys_key][sys_name]['file_name'] # 获取点位数组 point_conf = get_conf(sys_file_name, sys_file_path) # 取到配置 point_base_dic = point_conf['sys_point'] point_arr = [] # 取点位数据 # print(point_base_dic) for group in point_base_dic: for item in point_base_dic[group]: point_arr.append(item['key']) # 根据服务地址获取Opc数据 data = asyncio.run(get_opc_data(sys_url, point_arr, 2)) # 显示请求地址IP get_request_ip() for group in point_base_dic: for item in point_base_dic[group]: try: key = item['key'] point_val = data[key] item['val'] = point_val finally: continue return point_base_dic def get_conf(file_name="config.json", file_path="./config"): path = f"{file_path}/{file_name}" with open(path, "r", encoding="utf-8") as f: content = json.load(f) f.close() return content async def get_opc_data(sys_url, point_arr, ns=2): if sys_url is None or point_arr is None: return url = sys_url try: async with Client(url=url) as client: handler = SubHandler() r = handler.r i = 0 result_arr = {} print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f')) while i < len(point_arr): node = f"ns={ns};s={point_arr[i]}" tag = client.get_node(node) sub = await client.create_subscription(500, handler) value = None try: value = await tag.read_value() # redis值 r.set(str(node), str(value)) # 获取点位key arr_key = node.split("=") result_arr[arr_key[2]] = value # print(f"tag1 is: {tag} with value {value} ") i += 1 if i == len(point_arr): print("Get Data Success") finally: i += 1 continue print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f')) return result_arr finally: # 网络连通性测试 sys_ip = sys_url[10: sys_url.index(":", 10)] os.system(f'ping {sys_ip} -n 1') def get_request_ip(): ip = request.remote_addr logging.debug(ip) def api_get_error(msg): return '{"code":-1, "msg":"' + msg + '"}' if __name__ == '__main__': app.run()