123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- import os
- import time
- from typing import Optional
- import cv2
- import numpy as np
- from flask import Flask, request, jsonify
- from flask_cors import CORS
- from program_core import ProgramCore
- from program_public_tools import ProgramPublicTools
- class Server:
- def __init__(self, server_ip: str = '0.0.0.0', server_port: Optional[int] = None):
- self.__pubtools: ProgramPublicTools = ProgramPublicTools()
- self.__server_ip: str = server_ip
- self.__server_port: Optional[int] = server_port
- self.__server_ip_init()
- self.__core: ProgramCore = ProgramCore(self.__pubtools)
- self.__core.start_core()
- self.__app = Flask(__name__)
- CORS(self.__app)
- self.__setup_routes()
- self.__send_server_completed_flag()
- @staticmethod
- def __send_server_completed_flag():
- print("[Server] Server program is ready......")
- def __server_ip_init(self):
- if self.__server_port is None:
- self.__server_port = self.__pubtools.find_available_server_port(number=1, address=self.__server_ip)[0]
- print(f"Server is running on port [{self.__server_port}].")
- print(f"Server is running on port [{self.__server_port}].")
- print(f"Server is running on port [{self.__server_port}].")
- def start(self):
- self.__app.run(host=self.__server_ip, port=self.__server_port, debug=False)
- self.__server_debug_output()
- def __server_debug_output(self):
- _, _, url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening()
- print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
- print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
- print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
- print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
- print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
- @staticmethod
- def __instrument_get_not_connect_string(not_connect_list: list[str]) -> str:
- response_string: str = "not_connect_list:"
- for not_connect_item in not_connect_list:
- response_string = response_string + " " + not_connect_item
- return response_string
- def __setup_routes(self):
- @self.__app.route("/")
- def home():
- return "Server is running."
- @self.__app.route("/instrument/not_connect_list", methods=['GET'])
- def instrument_not_connect_list():
- not_connect_list: list[str] = self.__core.instrument_controller.services.check()
- return self.__instrument_get_not_connect_string(not_connect_list)
- @self.__app.route("/instrument/digital_multimeter/connection_status", methods=['GET'])
- def instrument_digital_multimeter_connection_status():
- connection_status: bool = (self.__core.instrument_controller.services.digital_multimeter is not None)
- return f"connection_status: {connection_status}"
- @self.__app.route("/instrument/digital_oscilloscope/connection_status", methods=['GET'])
- def instrument_digital_oscilloscope_connection_status():
- connection_status: bool = (self.__core.instrument_controller.services.digital_oscilloscope is not None)
- return f"connection_status: {connection_status}"
- @self.__app.route("/instrument/waveform_generator/connection_status", methods=['GET'])
- def instrument_waveform_generator_connection_status():
- connection_status: bool = (self.__core.instrument_controller.services.waveform_generator is not None)
- return f"connection_status: {connection_status}"
- @self.__app.route("/instrument/analog_electronic_load/connection_status", methods=['GET'])
- def instrument_analog_electronic_load_connection_status():
- connection_status: bool = (self.__core.instrument_controller.services.analog_electronic_load is not None)
- return f"connection_status: {connection_status}"
- @self.__app.route("/instrument/digital_multimeter/keep_listening", methods=['GET'])
- def instrument_digital_multimeter_keep_listening():
- if self.__core.instrument_controller.services.digital_multimeter is not None:
- self.__core.instrument_controller.services.digital_multimeter.keep_listening()
- return f"keep_listening"
- else:
- return "None"
- @self.__app.route("/instrument/digital_multimeter/get_range", methods=['GET'])
- def instrument_digital_multimeter_get_range():
- if self.__core.instrument_controller.services.digital_multimeter is not None:
- range_list: list[str] = self.__core.instrument_controller.services.digital_multimeter.get_range()
- response_string: str = "get_range:"
- for rang_type_item in range_list:
- response_string = response_string + " " + rang_type_item
- return response_string
- else:
- return "None"
- @self.__app.route("/instrument/digital_multimeter/set_range", methods=['GET'])
- def instrument_digital_multimeter_set_range():
- if self.__core.instrument_controller.services.digital_multimeter is not None:
- range_string = request.args.get('range')
- range_type = self.__core.instrument_controller.services.digital_multimeter.solve_range_string(
- range_string)
- self.__core.instrument_controller.services.digital_multimeter.set_range(range_type)
- return f"set_range: {range_string}"
- else:
- return "None"
- @self.__app.route("/instrument/digital_multimeter/get_value", methods=['GET'])
- def instrument_digital_multimeter_get_value():
- if self.__core.instrument_controller.services.digital_multimeter is not None:
- get_value = self.__core.instrument_controller.services.digital_multimeter.get_value_latest()
- return f"get_value: {get_value}"
- else:
- return "None"
- @self.__app.route("/instrument/digital_oscilloscope/keep_listening", methods=['GET'])
- def instrument_digital_oscilloscope_keep_listening():
- if self.__core.instrument_controller.services.digital_oscilloscope is not None:
- _, v_port, v_url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening()
- return f"keep_listening: {v_port} {v_url}"
- else:
- return "None"
- @self.__app.route("/instrument/digital_oscilloscope/export_image", methods=['GET'])
- def instrument_digital_oscilloscope_export_image():
- if self.__core.instrument_controller.services.digital_oscilloscope is not None:
- get_time: str = self.__core.instrument_controller.services.digital_oscilloscope.save_img()
- return get_time
- else:
- return "None"
- @self.__app.route("/instrument/waveform_generator/output_start", methods=['GET'])
- def instrument_waveform_generator_output_start():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- self.__core.instrument_controller.services.waveform_generator.output_start()
- return f"output_start"
- else:
- return "None"
- @self.__app.route("/instrument/waveform_generator/output_restart", methods=['GET'])
- def instrument_waveform_generator_output_restart():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- self.__core.instrument_controller.services.waveform_generator.output_restart()
- return f"output_restart"
- else:
- return "None"
- @self.__app.route("/instrument/waveform_generator/output_pause", methods=['GET'])
- def instrument_waveform_generator_output_pause():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- self.__core.instrument_controller.services.waveform_generator.output_pause()
- return f"output_pause"
- else:
- return "None"
- @self.__app.route("/instrument/waveform_generator/config/apply_config", methods=['GET'])
- def instrument_waveform_generator_config_apply_config():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- self.__core.instrument_controller.services.waveform_generator.apply_config(True)
- return "apply_config"
- else:
- return "None"
- @self.__app.route("/instrument/waveform_generator/config/set_enable", methods=['GET'])
- def instrument_waveform_generator_config_set_enable():
- channel_string: str = request.args.get('channel')
- enable_string: str = request.args.get('enable').lower()
- if enable_string == "true":
- enable: bool = True
- else:
- enable: bool = False
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel = self.__core.instrument_controller.services.waveform_generator.set_enable(channel_string,
- enable)
- return f"set_enable: {channel} {enable}"
- else:
- return f"set_enable: None None"
- @self.__app.route("/instrument/waveform_generator/config/get_type", methods=['GET'])
- def instrument_waveform_generator_config_get_type():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- res: str = self.__core.instrument_controller.services.waveform_generator.config_channel_1.get_type_string()
- return f"get_type: " + res
- else:
- return f"get_type: " + "None"
- @self.__app.route("/instrument/waveform_generator/config/set_type", methods=['GET'])
- def instrument_waveform_generator_config_set_type():
- channel_string: str = request.args.get('channel')
- type_string: str = request.args.get('type')
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.type = config.match_type_string(type_string)
- return f"set_type: {channel} {type_string}"
- else:
- return f"set_type: None None"
- @self.__app.route("/instrument/waveform_generator/config/set_freq", methods=['GET'])
- def instrument_waveform_generator_config_set_freq():
- channel_string: str = request.args.get('channel')
- freq: float = float(request.args.get('freq'))
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.freq = freq
- return f"set_freq: {channel} {freq}"
- else:
- return f"set_freq: None None"
- @self.__app.route("/instrument/waveform_generator/config/get_freq_unit", methods=['GET'])
- def instrument_waveform_generator_config_get_freq_unit():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1")
- unit_type_string: str = config.get_freq_unit_type_string()
- return f"get_freq_unit: {unit_type_string}"
- else:
- return f"get_freq_unit: None"
- @self.__app.route("/instrument/waveform_generator/config/set_freq_unit", methods=['GET'])
- def instrument_waveform_generator_config_set_freq_unit():
- channel_string: str = request.args.get('channel')
- freq_unit_string: str = request.args.get('freq_unit')
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.freq_unit = config.match_freq_unit_type_string(freq_unit_string)
- return f"set_freq_unit: {channel} {freq_unit_string}"
- else:
- return f"set_freq_unit: None None"
- @self.__app.route("/instrument/waveform_generator/config/set_high_level", methods=['GET'])
- def instrument_waveform_generator_config_set_high_level():
- channel_string: str = request.args.get('channel')
- high_level: float = float(request.args.get('high_level'))
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.high_level = high_level
- return f"set_high_level: {channel} {high_level}"
- else:
- return f"set_high_level: None None"
- @self.__app.route("/instrument/waveform_generator/config/set_low_level", methods=['GET'])
- def instrument_waveform_generator_config_set_low_level():
- channel_string: str = request.args.get('channel')
- low_level: float = float(request.args.get('low_level'))
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.low_level = low_level
- return f"set_low_level: {channel} {low_level}"
- else:
- return f"set_low_level: None None"
- @self.__app.route("/instrument/waveform_generator/config/get_level_unit", methods=['GET'])
- def instrument_waveform_generator_config_get_level_unit():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1")
- unit_type_string: str = config.get_freq_unit_type_string()
- return f"get_freq_unit: {unit_type_string}"
- else:
- return f"get_freq_unit: None"
- @self.__app.route("/instrument/waveform_generator/config/set_high_level_unit", methods=['GET'])
- def instrument_waveform_generator_config_set_high_level_unit():
- channel_string: str = request.args.get('channel')
- high_level_unit_string: str = request.args.get('high_level_unit')
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.high_level_unit = config.match_level_unit_type_string(high_level_unit_string)
- return f"set_high_level_unit: {channel} {high_level_unit_string}"
- else:
- return f"set_high_level_unit: None {high_level_unit_string}"
- @self.__app.route("/instrument/waveform_generator/config/set_low_level_unit", methods=['GET'])
- def instrument_waveform_generator_config_set_low_level_unit():
- channel_string: str = request.args.get('channel')
- low_level_unit_string: str = request.args.get('low_level_unit')
- if self.__core.instrument_controller.services.waveform_generator is not None:
- channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
- channel_string)
- config.low_level_unit = config.match_level_unit_type_string(low_level_unit_string)
- return f"set_low_level_unit: {channel} {low_level_unit_string}"
- else:
- return f"set_low_level_unit: None {low_level_unit_string}"
- @self.__app.route("/instrument/waveform_generator/config/apply_all_setting", methods=['GET'])
- def instrument_waveform_generator_config_apply_all_setting():
- if self.__core.instrument_controller.services.waveform_generator is not None:
- waveform_type: str = request.args.get('type')
- waveform_freq: float = float(request.args.get('freq'))
- waveform_freq_unit: str = request.args.get('freq_unit')
- waveform_voltage: float = float(request.args.get('voltage'))
- waveform_voltage_unit: str = request.args.get('voltage_unit')
- enable_ch1: bool = bool(request.args.get('ch1'))
- enable_ch2: bool = bool(request.args.get('ch2'))
- generator = self.__core.instrument_controller.services.waveform_generator
- _, config_ch1 = generator.get_config("1")
- _, config_ch2 = generator.get_config("2")
- for config_chx in [config_ch1, config_ch2]:
- config_chx.type = config_chx.match_type_string(waveform_type)
- config_chx.type_string = waveform_type
- config_chx.freq = waveform_freq
- config_chx.freq_unit = config_chx.match_freq_unit_type_string(waveform_freq_unit)
- config_chx.high_level = waveform_voltage
- config_chx.low_level = - waveform_voltage
- config_chx.high_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit)
- config_chx.low_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit)
- generator.set_enable("1", enable_ch1)
- generator.set_enable("2", enable_ch2)
- generator.apply_config(immediate_start=False)
- generator.output_start()
- return "True"
- else:
- return "None"
- @self.__app.route("/instrument/analog_electronic_load/set_resistance", methods=['GET'])
- def instrument_analog_electronic_load_set_resistance():
- if self.__core.instrument_controller.services.analog_electronic_load is not None:
- resistance: str = request.args.get('set_resistance')
- self.__core.instrument_controller.services.analog_electronic_load.set_resistance(resistance)
- return "True"
- else:
- return "False"
- @self.__app.route("/serial_device/port_list", methods=['GET'])
- def serial_device_port_list():
- port_list: list[str] = self.__core.serial_controller.get_port_list()
- response_string: str = "port_list:"
- if len(port_list) == 0:
- response_string = response_string + " " + "None"
- else:
- for port_name in port_list:
- response_string = response_string + " " + port_name
- return response_string
- @self.__app.route("/serial_device/auto_connect", methods=['GET'])
- def serial_device_auto_connect():
- matched_port_list, not_matched_port_list, cnc_matched = self.__core.serial_controller.auto_connect()
- response_data = {
- 'method': 'auto_connect',
- 'cnc_matched': cnc_matched,
- 'port_list': {
- 'matched': matched_port_list,
- 'not_matched': not_matched_port_list
- }
- }
- return jsonify(response_data), 200
- @self.__app.route("/serial_device/cnc/config_connection", methods=['GET'])
- def serial_device_cnc_config_connection():
- port_name: str = request.args.get('port_name')
- com_connection = self.__core.serial_controller.try_build_connection(port_name, if_add_source_to_lib=True)
- if_success = self.__core.serial_controller.services.cnc_attenuator.match_connection(com_connection)
- return f"{if_success}"
- @self.__app.route("/serial_device/cnc/set", methods=['GET'])
- def serial_device_cnc_set():
- set_value: str = request.args.get('value')
- if_success = self.__core.serial_controller.services.cnc_attenuator.set(float(set_value))
- return f"{if_success}"
- @self.__app.route("/serial_device/plc/config_connection", methods=['GET'])
- def serial_device_plc_connect_sender():
- port_name: str = request.args.get('port_name')
- plc_com_role: str = request.args.get('plc_com_role').lower()
- if_success: bool = False
- if (port_name is not None) and (port_name not in ["None", "", " "]):
- com_connection = self.__core.serial_controller.try_build_connection(port_name,
- if_add_source_to_lib=True)
- if com_connection is not None:
- if plc_com_role == "sender":
- if_success = self.__core.serial_controller.services.plc_sender.match_connection(com_connection)
- elif plc_com_role == "receiver":
- if_success = self.__core.serial_controller.services.plc_receiver.match_connection(
- com_connection)
- elif plc_com_role == "receiver2":
- if_success = self.__core.serial_controller.services.plc_receiver2.match_connection(
- com_connection)
- if if_success is True:
- self.__core.serial_controller.services.enable_multi_receiver = True
- else:
- if_success = False
- else:
- if_success: bool = True
- # if_success = True
- # ports = self.__core.serial_controller.get_port_list()
- # self.__core.serial_controller.services.plc_sender.match_connection()
- get_sender_port_name: str = "None"
- if self.__core.serial_controller.services.plc_sender.com is not None:
- get_sender_port_name = self.__core.serial_controller.services.plc_sender.com.connection_port_name
- get_receiver_port_name: str = "None"
- if self.__core.serial_controller.services.plc_receiver.com is not None:
- get_receiver_port_name = self.__core.serial_controller.services.plc_receiver.com.connection_port_name
- self.__core.serial_controller.services.com_usr_config_success = if_success
- response_data = {
- 'method': 'config_connection',
- 'plc_com_role': plc_com_role,
- 'port_name': port_name,
- 'if_success': if_success,
- 'new_config': {
- 'sender_port': f"{get_sender_port_name}",
- 'receiver_port': f"{get_receiver_port_name}",
- },
- 'received_config': {
- 'role': f"{plc_com_role}",
- 'port': f"{port_name}",
- }
- }
- return jsonify(response_data), 200
- @self.__app.route("/serial_device/plc/check_connection", methods=['GET'])
- def serial_device_plc_check_connection():
- sender_port: Optional[str] = self.__core.serial_controller.services.plc_sender.com.connection_port_name
- receiver_port: Optional[str] = self.__core.serial_controller.services.plc_receiver.com.connection_port_name
- sender_connection_flag: bool = (sender_port is not None)
- receiver_connection_flag: bool = (receiver_port is not None)
- connection_flag: bool = (sender_connection_flag is True) and (receiver_connection_flag is True)
- response_data = {
- 'method': 'check_connection',
- 'connection_status': connection_flag,
- 'config': {
- 'sender_port': f"{self.__core.serial_controller.services.plc_sender.com.connection_port_name}",
- 'receiver_port': f"{self.__core.serial_controller.services.plc_receiver.com.connection_port_name}",
- }
- }
- return jsonify(response_data), 200
- @self.__app.route("/serial_device/plc/start", methods=['GET'])
- def serial_device_plc_start():
- if self.__core.serial_controller.services.com_usr_config_success is True:
- package_config: Optional[str] = request.args.get('package_config')
- sender = self.__core.serial_controller.services.plc_sender
- receiver = self.__core.serial_controller.services.plc_receiver
- sender_port, sender_url = sender.keep_send()
- receiver_port, receiver_url = receiver.keep_receive()
- receiver2 = None
- if self.__core.serial_controller.services.enable_multi_receiver is True:
- receiver2 = self.__core.serial_controller.services.plc_receiver2
- receiver2.keep_receive()
- tester = self.__core.serial_controller.get_tester()
- tester.start_test_service(sender, receiver, receiver2)
- if package_config == "单包1kbit":
- tester_frame_new: bytes = tester.send_package_1kbit
- elif package_config == "单包16kbit":
- tester_frame_new: bytes = tester.send_package_16kbit
- else:
- tester_frame_new: bytes = package_config.encode()
- if len(tester_frame_new) % 3 == 0:
- tester_frame_new += b'12\n'
- elif len(tester_frame_new) % 3 == 1: # b'1'
- tester_frame_new += b'1\n'
- elif len(tester_frame_new) % 3 == 2: # b'12'
- tester_frame_new += b'\n'
- else:
- tester_frame_new = b'12\n'
- tester.reset_frame_std_and_restart(tester_frame_new)
- else:
- sender_port, sender_url, receiver_port, receiver_url = 0, 0, 0, 0
- response_data = {
- 'method': 'start',
- 'sender': {
- 'port': f"{sender_port}",
- 'url': sender_url,
- },
- 'receiver': {
- 'port': f"{receiver_port}",
- 'url': receiver_url,
- }
- }
- print(f"=== @self.__app.route('/serial_device/plc/start', methods=['GET']) ===")
- return jsonify(response_data), 200
- @self.__app.route("/serial_device/plc/pause", methods=['GET'])
- def serial_device_plc_pause():
- # self.__core.serial_controller.services.plc_sender.pause()
- # self.__core.serial_controller.services.plc_receiver.pause()
- tester = self.__core.serial_controller.get_tester()
- tester.pause_test_service()
- print(f"=== @self.__app.route('/serial_device/plc/pause', methods=['GET']) ===")
- return "pause"
- @self.__app.route("/serial_device/plc/clear", methods=['GET'])
- def serial_device_plc_clear():
- self.__core.serial_controller.get_tester().task_add_reset_error_rate()
- print(f"=== @self.__app.route('/serial_device/plc/clear', methods=['GET']) ===")
- return "clear"
- @self.__app.route("/serial_device/plc/realtime_performance", methods=['GET'])
- def serial_device_plc_realtime_performance():
- tester = self.__core.serial_controller.get_tester()
- error_rate_percent: float = tester.error_rate_percent
- error_rate_percent2: float = tester.error_rate_percent2
- print(f"=== @self.__app.route('/serial_device/plc/realtime_performance', methods=['GET']) ===")
- return f"realtime_performance: error_rate_percent={error_rate_percent} error_rate_percent2={error_rate_percent2}"
- @self.__app.route("/serial_device/plc/realtime_performance_speed", methods=['GET'])
- def serial_device_plc_realtime_performance_speed():
- tester = self.__core.serial_controller.get_tester()
- speed_kbps: float = tester.speed_kbps
- speed_kbps2: float = tester.speed_kbps2
- print(f"=== @self.__app.route('/serial_device/plc/realtime_performance_speed', methods=['GET']) ===")
- return f"realtime_performance_speed: speed_kbps={speed_kbps} speed_kbps2={speed_kbps2}"
- @self.__app.route("/files/documents/start", methods=['GET'])
- def files_documents_start():
- host, port = self.__core.local_file_service.start()
- return f"documents: host={host} port={port}"
- if __name__ == "__main__":
- server: Server = Server()
- server.start()
|