123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import os
- import re
- import json
- import redis
- import logging
- import asyncio
- from flask import Flask, request, render_template
- from asyncua import Client
- from datetime import datetime
- class SubHandler(object):
- r = None
- pool = None
- config = None
- def __init__(self):
- self.config = get_conf()
- redis_conf = self.config['redis_conf']
- ip = redis_conf['ip']
- self.pool = redis.ConnectionPool(host=ip, password='')
- self.r = redis.Redis(connection_pool=self.pool)
- async def datachange_notification(self, node, val, data):
- print("Python: New data change event", node, val)
- def event_notification(self, event):
- print("Python: New event", event)
- app = Flask(__name__)
- # route
- @app.route("/get/", methods=['GET', 'POST'])
- def get():
- # 矿名key
- sys_key = request.values.get('sys_key')
- # 自动化系统key
- sys_name = request.values.get('sys_name')
- # 参数判断
- if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
- return "参数错误"
- # 获取对应配置
- base_conf = get_conf()
- # 获取opc服务地址
- sys_url = base_conf["opc_server_url"]
- # 获取配置文件名
- sys_file_path = base_conf['sys_conf'][sys_key][sys_name]['path']
- sys_file_name = base_conf['sys_conf'][sys_key][sys_name]['file_name']
- # 获取点位数组
- point_conf = get_conf(sys_file_name, sys_file_path)
- # 取到配置
- point_base_dic = point_conf['sys_point']
- point_arr = []
- # 取点位数据
- # print(point_base_dic)
- for group in point_base_dic:
- for item in point_base_dic[group]:
- point_arr.append(item['key'])
- # 根据服务地址获取Opc数据
- data = asyncio.run(get_opc_data(sys_url, point_arr, 2))
- # 显示请求地址IP
- get_request_ip()
- for group in point_base_dic:
- for item in point_base_dic[group]:
- try:
- key = item['key']
- point_val = data[key]
- item['val'] = point_val
- finally:
- continue
- return point_base_dic
- def get_conf(file_name="config.json", file_path="./config"):
- path = f"{file_path}/{file_name}"
- with open(path, "r", encoding="utf-8") as f:
- content = json.load(f)
- f.close()
- return content
- async def get_opc_data(sys_url, point_arr, ns=2):
- if sys_url is None or point_arr is None:
- return
- url = sys_url
- try:
- async with Client(url=url) as client:
- handler = SubHandler()
- r = handler.r
- i = 0
- result_arr = {}
- print(datetime.now().strftime('Start_time:%Y-%m-%d %H:%M:%S.%f'))
- while i < len(point_arr):
- node = f"ns={ns};s={point_arr[i]}"
- tag = client.get_node(node)
- sub = await client.create_subscription(500, handler)
- value = None
- try:
- value = await tag.read_value()
- # redis值
- r.set(str(node), str(value))
- # 获取点位key
- arr_key = node.split("=")
- result_arr[arr_key[2]] = value
- # print(f"tag1 is: {tag} with value {value} ")
- i += 1
- if i == len(point_arr):
- print("Get Data Success")
- finally:
- i += 1
- continue
- print(datetime.now().strftime('End_time:%Y-%m-%d %H:%M:%S.%f'))
- return result_arr
- finally:
- # 网络连通性测试
- sys_ip = sys_url[10: sys_url.index(":", 10)]
- os.system(f'ping {sys_ip} -n 1')
- def get_request_ip():
- ip = request.remote_addr
- logging.debug(ip)
- def api_get_error(msg):
- return '{"code":-1, "msg":"' + msg + '"}'
- if __name__ == '__main__':
- config = get_conf()
- port = config['server_port']
- app.run(
- host='0.0.0.0',
- port=config['server_port']
- )
|