import asyncio import json import redis import logging from flask import Flask, request, render_template from asyncua import Client from datetime import datetime class SubHandler(object): r = None pool = None config = None def __init__(self): self.config = get_conf() redis_conf = self.config['redis_conf'] ip = redis_conf['ip'] self.pool = redis.ConnectionPool(host=ip, password='') self.r = redis.Redis(connection_pool=self.pool) async def datachange_notification(self, node, val, data): print("Python: New data change event", node, val) def event_notification(self, event): print("Python: New event", event) app = Flask(__name__) # route @app.route("/get/", methods=['GET', 'POST']) def get(): # 矿名key sys_key = request.values.get('sys_key') # 自动化系统key sys_name = request.values.get('sys_name') # 参数判断 if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0: return "参数错误" # 获取对应配置 base_conf = get_conf() # 获取opc服务地址 sys_url = base_conf["server_url"] # 获取配置文件名 sys_file_path = base_conf['sys_conf'][sys_key]['path'] sys_file_name = base_conf['sys_conf'][sys_key]['file_name'] # 获取点位数组 point_conf = get_conf(sys_file_name, sys_file_path) # 取到配置 point_base_dic = point_conf['sys_point'] point_arr = [] # 取点位数据 # print(point_base_dic) for group in point_base_dic: for item in point_base_dic[group]: point_arr.append(item['key']) # 根据服务地址获取Opc数据 data = asyncio.run(get_opc_data(sys_url, point_arr, 2)) get_request_ip() for group in point_base_dic: for item in point_base_dic[group]: try: key = item['key'] point_val = data[key] item['val'] = point_val finally: continue return point_base_dic def get_conf(file_name="config.json", file_path="./config"): path = f"{file_path}/{file_name}" with open(path, "r", encoding="utf-8") as f: content = json.load(f) f.close() return content async def get_opc_data(sys_url, point_arr, ns=2): if sys_url is None or point_arr is None: return url = sys_url async with Client(url=url) as client: handler = SubHandler() r = handler.r i = 0 result_arr = {} print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f')) while i < len(point_arr): node = f"ns={ns};s={point_arr[i]}" tag = client.get_node(node) sub = await client.create_subscription(500, handler) value = None try: value = await tag.read_value() #redis值 r.set(str(node), str(value)) #获取点位key arr_key = node.split("=") result_arr[arr_key[2]] = value # print(f"tag1 is: {tag} with value {value} ") i += 1 if i == len(point_arr): print("Get Data Success") finally: i += 1 continue print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f')) return result_arr def get_request_ip(): ip = request.remote_addr logging.debug(ip) if __name__ == '__main__': app.run()