import os import re import json import redis import logging import asyncio from flask import Flask, request, render_template from asyncua import Client from datetime import datetime class SubHandler(object): r = None pool = None config = None def __init__(self): try: self.config = get_conf() redis_conf = self.config['redis_conf'] ip = redis_conf['ip'] self.pool = redis.ConnectionPool(host=ip, password='') self.r = redis.Redis(connection_pool=self.pool) except Exception as e: raise Exception("redis连接错误!") async def datachange_notification(self, node, val, data): print("Python: New data change event", node, val) def event_notification(self, event): print("Python: New event", event) app = Flask(__name__) # route @app.route("/get/", methods=['GET', 'POST']) def get(): # 矿名key sys_key = request.values.get('sys_key') # 自动化系统key sys_name = request.values.get('sys_name') # 参数判断 if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0: return "参数错误" # 获取程序配置 base_conf = get_conf() # 获取opc服务地址 sys_url = base_conf['sys_conf'][sys_key][sys_name]['opc_server_url'] # 获取配置文件名 sys_file_path = base_conf['sys_conf'][sys_key][sys_name]['path'] sys_file_name = base_conf['sys_conf'][sys_key][sys_name]['file_name'] # 获取点位数组 point_conf = get_conf(sys_file_name, sys_file_path) try: # 取需要监听的点位key数组 point_arr = point_to_arr(point_conf) # 根据服务地址获取Opc数据 data = asyncio.run(get_opc_data(sys_url, point_arr, 2)) # 根据获取值整理成json dict_data = point_data_to_arr(point_conf, data) # 显示请求地址IP # print(f"接收到请求,来自{get_request_ip()} {sys_key} {sys_name}") return dict_data except Exception as e: raise Exception("点位获取错误!") # 将点位整理数据,用于获取点位值 def point_to_arr(point_data): if isinstance(point_data, dict): point_arr = [] for key, val in point_data.items(): data = point_data[key] if 'key' in point_data.keys(): return point_data['key'] if isinstance(data, str): continue return point_to_arr(data) elif isinstance(point_data, list): point_arr = [] for i in range(len(point_data)): data = point_to_arr(point_data[i]) point_arr.append(data) return point_arr else: return # 将点位值赋予json(递归,适配多级json) def point_data_to_arr(point_data, data_arr=[]): # 未获取到值返回空数组 if len(data_arr) == 0: return point_data point_dict = {} if isinstance(point_data, dict): for key, val in point_data.items(): data = point_data[key] if 'val' == key: point_key = point_dict['key'] if point_key in data_arr: point_dict['val'] = data_arr[point_key] else: point_dict['val'] = "" elif isinstance(val, str): point_dict[key] = point_data[key] else: test = point_data_to_arr(data, data_arr) point_dict[key] = test elif isinstance(point_data, list): point_arr = [] for i in range(len(point_data)): data = point_data[i] data_dic = point_data_to_arr(data, data_arr) point_arr.append(data_dic) # point_arr[i] = return point_arr return point_dict # 获取点位值 async def get_opc_data(sys_url, point_arr, ns=2): if sys_url is None or point_arr is None: return url = sys_url async with Client(url=url) as client: handler = SubHandler() r = handler.r i = 0 result_arr = {} print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f')) while i < len(point_arr): node = f"ns={ns};s={point_arr[i]}" tag = client.get_node(node) try: # 监听点位 await client.create_subscription(500, handler) except Exception as e: # print(e.args) raise Exception("设置监听错误!") try: value = await tag.read_value() # redis值 r.set(str(node), str(value)) # 获取点位key arr_key = node.split("=") result_arr[arr_key[2]] = value # print(f"tag1 is: {tag} with value {value} ") i += 1 if i == len(point_arr): print("Get Data Success") finally: # print(f"node={node} 点位值获取错误") i += 1 continue print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f')) return result_arr # 获取文件配置 def get_conf(file_name="config.json", file_path="./config"): path = f"{file_path}/{file_name}" with open(path, "r", encoding="utf-8") as f: content = json.load(f) f.close() return content # 获取请求Ip def get_request_ip(): return request.remote_addr # 返回错误信息 def api_get_error(msg): return '{"code":-1, "msg":"' + msg + '"}' if __name__ == '__main__': config = get_conf() port = config['server_port'] app.run( host='0.0.0.0', port=config['server_port'] )