api_server.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import asyncio
  2. import get_config
  3. from get_opc_data import get_opc_data, point_data_to_arr
  4. from flask import Flask, request, render_template
  5. app = Flask(__name__)
  6. # route
  7. @app.route("/", methods=['GET', 'POST'])
  8. @app.route("/get/", methods=['GET', 'POST'])
  9. def get():
  10. try:
  11. # 1、获取请求参数
  12. # 矿名key
  13. sys_key = request.values.get('sys_key')
  14. # 自动化系统key
  15. sys_name = request.values.get('sys_name')
  16. # 2、参数判断
  17. if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
  18. return api_get_error("参数错误")
  19. except BaseException as e:
  20. print(f"异常,参数错误!")
  21. return api_get_error(e)
  22. # 3、开启服务
  23. # 3.1 获取opc点位arr
  24. opc_point_arr = get_config.get_opc_point_arr_conf(sys_key, sys_name)
  25. # 3.2 根据点位获取数据
  26. opc_sys_conf = get_config.get_opc_sys_conf(sys_key, sys_name)
  27. opc_data = asyncio.run(get_opc_data(opc_sys_conf.opc_server_url, opc_point_arr))
  28. # opc_data = get_opc_data(opc_sys_conf.opc_server_url, opc_point_arr)
  29. # 3.3 数据处理
  30. opc_json_data = get_config.get_opc_point_json_conf(sys_key, sys_name)
  31. if opc_data is None or isinstance(opc_data, str):
  32. return api_get_error("数据读取错误!请检查点位或服务配置!")
  33. else:
  34. json_data = point_data_to_arr(opc_json_data, opc_data)
  35. # 数据处理
  36. return json_data
  37. @app.route("/get_base_json/", methods=['GET', 'POST'])
  38. def get_base_json():
  39. try:
  40. # 1、获取请求参数
  41. # 矿名key
  42. sys_key = request.values.get('sys_key')
  43. # 自动化系统key
  44. sys_name = request.values.get('sys_name')
  45. # 2、参数判断
  46. if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
  47. return api_get_error("参数错误")
  48. except BaseException as e:
  49. print(f"异常,参数错误!")
  50. return api_get_error(e)
  51. opc_json_data = get_config.get_opc_point_json_conf(sys_key, sys_name)
  52. return opc_json_data
  53. @app.route("/get_point_arr/", methods=['GET', 'POST'])
  54. def get_point_arr():
  55. try:
  56. # 1、获取请求参数
  57. # 矿名key
  58. sys_key = request.values.get('sys_key')
  59. # 自动化系统key
  60. sys_name = request.values.get('sys_name')
  61. # 2、参数判断
  62. if sys_key is None or len(sys_key) == 0 or sys_name is None or len(sys_name) == 0:
  63. return api_get_error("参数错误")
  64. except BaseException as e:
  65. print(f"异常,参数错误!")
  66. return api_get_error(e)
  67. opc_point_arr = get_config.get_opc_point_arr_conf(sys_key, sys_name)
  68. return opc_point_arr
  69. def api_get_error(msg):
  70. return '{"code":-1, "msg":"' + msg + '"}'
  71. # 开启接口服务
  72. def run_api_server():
  73. config = get_config.get_base_conf()
  74. port = config.server_port
  75. app.run(
  76. host='0.0.0.0',
  77. port=port
  78. )
  79. if __name__ == "__main__":
  80. run_api_server()