import os import time from typing import Optional import cv2 import numpy as np from flask import Flask, request, jsonify from flask_cors import CORS from program_core import ProgramCore from program_public_tools import ProgramPublicTools class Server: def __init__(self, server_ip: str = '0.0.0.0', server_port: Optional[int] = None): self.__pubtools: ProgramPublicTools = ProgramPublicTools() self.__server_ip: str = server_ip self.__server_port: Optional[int] = server_port self.__server_ip_init() self.__core: ProgramCore = ProgramCore(self.__pubtools) self.__core.start_core() self.__app = Flask(__name__) CORS(self.__app) self.__setup_routes() self.__send_server_completed_flag() @staticmethod def __send_server_completed_flag(): print("[Server] Server program is ready......") def __server_ip_init(self): if self.__server_port is None: self.__server_port = self.__pubtools.find_available_server_port(number=1, address=self.__server_ip)[0] print(f"Server is running on port [{self.__server_port}].") print(f"Server is running on port [{self.__server_port}].") print(f"Server is running on port [{self.__server_port}].") def start(self): self.__app.run(host=self.__server_ip, port=self.__server_port, debug=False) self.__server_debug_output() def __server_debug_output(self): _, _, url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening() print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>") print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>") print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>") print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>") print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>") @staticmethod def __instrument_get_not_connect_string(not_connect_list: list[str]) -> str: response_string: str = "not_connect_list:" for not_connect_item in not_connect_list: response_string = response_string + " " + not_connect_item return response_string def __setup_routes(self): @self.__app.route("/") def home(): return "Server is running." @self.__app.route("/instrument/not_connect_list", methods=['GET']) def instrument_not_connect_list(): not_connect_list: list[str] = self.__core.instrument_controller.services.check() return self.__instrument_get_not_connect_string(not_connect_list) @self.__app.route("/instrument/digital_multimeter/connection_status", methods=['GET']) def instrument_digital_multimeter_connection_status(): connection_status: bool = (self.__core.instrument_controller.services.digital_multimeter is not None) return f"connection_status: {connection_status}" @self.__app.route("/instrument/digital_oscilloscope/connection_status", methods=['GET']) def instrument_digital_oscilloscope_connection_status(): connection_status: bool = (self.__core.instrument_controller.services.digital_oscilloscope is not None) return f"connection_status: {connection_status}" @self.__app.route("/instrument/waveform_generator/connection_status", methods=['GET']) def instrument_waveform_generator_connection_status(): connection_status: bool = (self.__core.instrument_controller.services.waveform_generator is not None) return f"connection_status: {connection_status}" @self.__app.route("/instrument/analog_electronic_load/connection_status", methods=['GET']) def instrument_analog_electronic_load_connection_status(): connection_status: bool = (self.__core.instrument_controller.services.analog_electronic_load is not None) return f"connection_status: {connection_status}" @self.__app.route("/instrument/digital_multimeter/keep_listening", methods=['GET']) def instrument_digital_multimeter_keep_listening(): if self.__core.instrument_controller.services.digital_multimeter is not None: self.__core.instrument_controller.services.digital_multimeter.keep_listening() return f"keep_listening" else: return "None" @self.__app.route("/instrument/digital_multimeter/get_range", methods=['GET']) def instrument_digital_multimeter_get_range(): if self.__core.instrument_controller.services.digital_multimeter is not None: range_list: list[str] = self.__core.instrument_controller.services.digital_multimeter.get_range() response_string: str = "get_range:" for rang_type_item in range_list: response_string = response_string + " " + rang_type_item return response_string else: return "None" @self.__app.route("/instrument/digital_multimeter/set_range", methods=['GET']) def instrument_digital_multimeter_set_range(): if self.__core.instrument_controller.services.digital_multimeter is not None: range_string = request.args.get('range') range_type = self.__core.instrument_controller.services.digital_multimeter.solve_range_string( range_string) self.__core.instrument_controller.services.digital_multimeter.set_range(range_type) return f"set_range: {range_string}" else: return "None" @self.__app.route("/instrument/digital_multimeter/get_value", methods=['GET']) def instrument_digital_multimeter_get_value(): if self.__core.instrument_controller.services.digital_multimeter is not None: get_value = self.__core.instrument_controller.services.digital_multimeter.get_value_latest() return f"get_value: {get_value}" else: return "None" @self.__app.route("/instrument/digital_oscilloscope/keep_listening", methods=['GET']) def instrument_digital_oscilloscope_keep_listening(): if self.__core.instrument_controller.services.digital_oscilloscope is not None: _, v_port, v_url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening() return f"keep_listening: {v_port} {v_url}" else: return "None" @self.__app.route("/instrument/digital_oscilloscope/export_image", methods=['GET']) def instrument_digital_oscilloscope_export_image(): if self.__core.instrument_controller.services.digital_oscilloscope is not None: get_time: str = self.__core.instrument_controller.services.digital_oscilloscope.save_img() return get_time else: return "None" @self.__app.route("/instrument/waveform_generator/output_start", methods=['GET']) def instrument_waveform_generator_output_start(): if self.__core.instrument_controller.services.waveform_generator is not None: self.__core.instrument_controller.services.waveform_generator.output_start() return f"output_start" else: return "None" @self.__app.route("/instrument/waveform_generator/output_restart", methods=['GET']) def instrument_waveform_generator_output_restart(): if self.__core.instrument_controller.services.waveform_generator is not None: self.__core.instrument_controller.services.waveform_generator.output_restart() return f"output_restart" else: return "None" @self.__app.route("/instrument/waveform_generator/output_pause", methods=['GET']) def instrument_waveform_generator_output_pause(): if self.__core.instrument_controller.services.waveform_generator is not None: self.__core.instrument_controller.services.waveform_generator.output_pause() return f"output_pause" else: return "None" @self.__app.route("/instrument/waveform_generator/config/apply_config", methods=['GET']) def instrument_waveform_generator_config_apply_config(): if self.__core.instrument_controller.services.waveform_generator is not None: self.__core.instrument_controller.services.waveform_generator.apply_config(True) return "apply_config" else: return "None" @self.__app.route("/instrument/waveform_generator/config/set_enable", methods=['GET']) def instrument_waveform_generator_config_set_enable(): channel_string: str = request.args.get('channel') enable_string: str = request.args.get('enable').lower() if enable_string == "true": enable: bool = True else: enable: bool = False if self.__core.instrument_controller.services.waveform_generator is not None: channel = self.__core.instrument_controller.services.waveform_generator.set_enable(channel_string, enable) return f"set_enable: {channel} {enable}" else: return f"set_enable: None None" @self.__app.route("/instrument/waveform_generator/config/get_type", methods=['GET']) def instrument_waveform_generator_config_get_type(): if self.__core.instrument_controller.services.waveform_generator is not None: res: str = self.__core.instrument_controller.services.waveform_generator.config_channel_1.get_type_string() return f"get_type: " + res else: return f"get_type: " + "None" @self.__app.route("/instrument/waveform_generator/config/set_type", methods=['GET']) def instrument_waveform_generator_config_set_type(): channel_string: str = request.args.get('channel') type_string: str = request.args.get('type') if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.type = config.match_type_string(type_string) return f"set_type: {channel} {type_string}" else: return f"set_type: None None" @self.__app.route("/instrument/waveform_generator/config/set_freq", methods=['GET']) def instrument_waveform_generator_config_set_freq(): channel_string: str = request.args.get('channel') freq: float = float(request.args.get('freq')) if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.freq = freq return f"set_freq: {channel} {freq}" else: return f"set_freq: None None" @self.__app.route("/instrument/waveform_generator/config/get_freq_unit", methods=['GET']) def instrument_waveform_generator_config_get_freq_unit(): if self.__core.instrument_controller.services.waveform_generator is not None: _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1") unit_type_string: str = config.get_freq_unit_type_string() return f"get_freq_unit: {unit_type_string}" else: return f"get_freq_unit: None" @self.__app.route("/instrument/waveform_generator/config/set_freq_unit", methods=['GET']) def instrument_waveform_generator_config_set_freq_unit(): channel_string: str = request.args.get('channel') freq_unit_string: str = request.args.get('freq_unit') if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.freq_unit = config.match_freq_unit_type_string(freq_unit_string) return f"set_freq_unit: {channel} {freq_unit_string}" else: return f"set_freq_unit: None None" @self.__app.route("/instrument/waveform_generator/config/set_high_level", methods=['GET']) def instrument_waveform_generator_config_set_high_level(): channel_string: str = request.args.get('channel') high_level: float = float(request.args.get('high_level')) if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.high_level = high_level return f"set_high_level: {channel} {high_level}" else: return f"set_high_level: None None" @self.__app.route("/instrument/waveform_generator/config/set_low_level", methods=['GET']) def instrument_waveform_generator_config_set_low_level(): channel_string: str = request.args.get('channel') low_level: float = float(request.args.get('low_level')) if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.low_level = low_level return f"set_low_level: {channel} {low_level}" else: return f"set_low_level: None None" @self.__app.route("/instrument/waveform_generator/config/get_level_unit", methods=['GET']) def instrument_waveform_generator_config_get_level_unit(): if self.__core.instrument_controller.services.waveform_generator is not None: _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1") unit_type_string: str = config.get_freq_unit_type_string() return f"get_freq_unit: {unit_type_string}" else: return f"get_freq_unit: None" @self.__app.route("/instrument/waveform_generator/config/set_high_level_unit", methods=['GET']) def instrument_waveform_generator_config_set_high_level_unit(): channel_string: str = request.args.get('channel') high_level_unit_string: str = request.args.get('high_level_unit') if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.high_level_unit = config.match_level_unit_type_string(high_level_unit_string) return f"set_high_level_unit: {channel} {high_level_unit_string}" else: return f"set_high_level_unit: None {high_level_unit_string}" @self.__app.route("/instrument/waveform_generator/config/set_low_level_unit", methods=['GET']) def instrument_waveform_generator_config_set_low_level_unit(): channel_string: str = request.args.get('channel') low_level_unit_string: str = request.args.get('low_level_unit') if self.__core.instrument_controller.services.waveform_generator is not None: channel, config = self.__core.instrument_controller.services.waveform_generator.get_config( channel_string) config.low_level_unit = config.match_level_unit_type_string(low_level_unit_string) return f"set_low_level_unit: {channel} {low_level_unit_string}" else: return f"set_low_level_unit: None {low_level_unit_string}" @self.__app.route("/instrument/waveform_generator/config/apply_all_setting", methods=['GET']) def instrument_waveform_generator_config_apply_all_setting(): if self.__core.instrument_controller.services.waveform_generator is not None: waveform_type: str = request.args.get('type') waveform_freq: float = float(request.args.get('freq')) waveform_freq_unit: str = request.args.get('freq_unit') waveform_voltage: float = float(request.args.get('voltage')) waveform_voltage_unit: str = request.args.get('voltage_unit') enable_ch1: bool = bool(request.args.get('ch1')) enable_ch2: bool = bool(request.args.get('ch2')) generator = self.__core.instrument_controller.services.waveform_generator _, config_ch1 = generator.get_config("1") _, config_ch2 = generator.get_config("2") for config_chx in [config_ch1, config_ch2]: config_chx.type = config_chx.match_type_string(waveform_type) config_chx.type_string = waveform_type config_chx.freq = waveform_freq config_chx.freq_unit = config_chx.match_freq_unit_type_string(waveform_freq_unit) config_chx.high_level = waveform_voltage config_chx.low_level = - waveform_voltage config_chx.high_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit) config_chx.low_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit) generator.set_enable("1", enable_ch1) generator.set_enable("2", enable_ch2) generator.apply_config(immediate_start=False) generator.output_start() return "True" else: return "None" @self.__app.route("/instrument/analog_electronic_load/set_resistance", methods=['GET']) def instrument_analog_electronic_load_set_resistance(): if self.__core.instrument_controller.services.analog_electronic_load is not None: resistance: str = request.args.get('set_resistance') self.__core.instrument_controller.services.analog_electronic_load.set_resistance(resistance) return "True" else: return "False" @self.__app.route("/serial_device/port_list", methods=['GET']) def serial_device_port_list(): port_list: list[str] = self.__core.serial_controller.get_port_list() response_string: str = "port_list:" if len(port_list) == 0: response_string = response_string + " " + "None" else: for port_name in port_list: response_string = response_string + " " + port_name return response_string @self.__app.route("/serial_device/auto_connect", methods=['GET']) def serial_device_auto_connect(): matched_port_list, not_matched_port_list, cnc_matched = self.__core.serial_controller.auto_connect() response_data = { 'method': 'auto_connect', 'cnc_matched': cnc_matched, 'port_list': { 'matched': matched_port_list, 'not_matched': not_matched_port_list } } return jsonify(response_data), 200 @self.__app.route("/serial_device/cnc/config_connection", methods=['GET']) def serial_device_cnc_config_connection(): port_name: str = request.args.get('port_name') com_connection = self.__core.serial_controller.try_build_connection(port_name, if_add_source_to_lib=True) if_success = self.__core.serial_controller.services.cnc_attenuator.match_connection(com_connection) return f"{if_success}" @self.__app.route("/serial_device/cnc/set", methods=['GET']) def serial_device_cnc_set(): set_value: str = request.args.get('value') if_success = self.__core.serial_controller.services.cnc_attenuator.set(float(set_value)) return f"{if_success}" @self.__app.route("/serial_device/plc/config_connection", methods=['GET']) def serial_device_plc_connect_sender(): port_name: str = request.args.get('port_name') plc_com_role: str = request.args.get('plc_com_role').lower() if_success: bool = False if (port_name is not None) and (port_name not in ["None", "", " "]): com_connection = self.__core.serial_controller.try_build_connection(port_name, if_add_source_to_lib=True) if com_connection is not None: if plc_com_role == "sender": if_success = self.__core.serial_controller.services.plc_sender.match_connection(com_connection) elif plc_com_role == "receiver": if_success = self.__core.serial_controller.services.plc_receiver.match_connection( com_connection) elif plc_com_role == "receiver2": if_success = self.__core.serial_controller.services.plc_receiver2.match_connection( com_connection) if if_success is True: self.__core.serial_controller.services.enable_multi_receiver = True else: if_success = False else: if_success: bool = True # if_success = True # ports = self.__core.serial_controller.get_port_list() # self.__core.serial_controller.services.plc_sender.match_connection() get_sender_port_name: str = "None" if self.__core.serial_controller.services.plc_sender.com is not None: get_sender_port_name = self.__core.serial_controller.services.plc_sender.com.connection_port_name get_receiver_port_name: str = "None" if self.__core.serial_controller.services.plc_receiver.com is not None: get_receiver_port_name = self.__core.serial_controller.services.plc_receiver.com.connection_port_name self.__core.serial_controller.services.com_usr_config_success = if_success response_data = { 'method': 'config_connection', 'plc_com_role': plc_com_role, 'port_name': port_name, 'if_success': if_success, 'new_config': { 'sender_port': f"{get_sender_port_name}", 'receiver_port': f"{get_receiver_port_name}", }, 'received_config': { 'role': f"{plc_com_role}", 'port': f"{port_name}", } } return jsonify(response_data), 200 @self.__app.route("/serial_device/plc/check_connection", methods=['GET']) def serial_device_plc_check_connection(): sender_port: Optional[str] = self.__core.serial_controller.services.plc_sender.com.connection_port_name receiver_port: Optional[str] = self.__core.serial_controller.services.plc_receiver.com.connection_port_name sender_connection_flag: bool = (sender_port is not None) receiver_connection_flag: bool = (receiver_port is not None) connection_flag: bool = (sender_connection_flag is True) and (receiver_connection_flag is True) response_data = { 'method': 'check_connection', 'connection_status': connection_flag, 'config': { 'sender_port': f"{self.__core.serial_controller.services.plc_sender.com.connection_port_name}", 'receiver_port': f"{self.__core.serial_controller.services.plc_receiver.com.connection_port_name}", } } return jsonify(response_data), 200 @self.__app.route("/serial_device/plc/start", methods=['GET']) def serial_device_plc_start(): if self.__core.serial_controller.services.com_usr_config_success is True: package_config: Optional[str] = request.args.get('package_config') sender = self.__core.serial_controller.services.plc_sender receiver = self.__core.serial_controller.services.plc_receiver sender_port, sender_url = sender.keep_send() receiver_port, receiver_url = receiver.keep_receive() receiver2 = None if self.__core.serial_controller.services.enable_multi_receiver is True: receiver2 = self.__core.serial_controller.services.plc_receiver2 receiver2.keep_receive() tester = self.__core.serial_controller.get_tester() tester.start_test_service(sender, receiver, receiver2) if package_config == "单包1kbit": tester_frame_new: bytes = tester.send_package_1kbit elif package_config == "单包16kbit": tester_frame_new: bytes = tester.send_package_16kbit else: tester_frame_new: bytes = package_config.encode() if len(tester_frame_new) % 3 == 0: tester_frame_new += b'12\n' elif len(tester_frame_new) % 3 == 1: # b'1' tester_frame_new += b'1\n' elif len(tester_frame_new) % 3 == 2: # b'12' tester_frame_new += b'\n' else: tester_frame_new = b'12\n' tester.reset_frame_std_and_restart(tester_frame_new) else: sender_port, sender_url, receiver_port, receiver_url = 0, 0, 0, 0 response_data = { 'method': 'start', 'sender': { 'port': f"{sender_port}", 'url': sender_url, }, 'receiver': { 'port': f"{receiver_port}", 'url': receiver_url, } } print(f"=== @self.__app.route('/serial_device/plc/start', methods=['GET']) ===") return jsonify(response_data), 200 @self.__app.route("/serial_device/plc/pause", methods=['GET']) def serial_device_plc_pause(): # self.__core.serial_controller.services.plc_sender.pause() # self.__core.serial_controller.services.plc_receiver.pause() tester = self.__core.serial_controller.get_tester() tester.pause_test_service() print(f"=== @self.__app.route('/serial_device/plc/pause', methods=['GET']) ===") return "pause" @self.__app.route("/serial_device/plc/clear", methods=['GET']) def serial_device_plc_clear(): self.__core.serial_controller.get_tester().task_add_reset_error_rate() print(f"=== @self.__app.route('/serial_device/plc/clear', methods=['GET']) ===") return "clear" @self.__app.route("/serial_device/plc/realtime_performance", methods=['GET']) def serial_device_plc_realtime_performance(): tester = self.__core.serial_controller.get_tester() error_rate_percent: float = tester.error_rate_percent error_rate_percent2: float = tester.error_rate_percent2 print(f"=== @self.__app.route('/serial_device/plc/realtime_performance', methods=['GET']) ===") return f"realtime_performance: error_rate_percent={error_rate_percent} error_rate_percent2={error_rate_percent2}" @self.__app.route("/serial_device/plc/realtime_performance_speed", methods=['GET']) def serial_device_plc_realtime_performance_speed(): tester = self.__core.serial_controller.get_tester() speed_kbps: float = tester.speed_kbps speed_kbps2: float = tester.speed_kbps2 print(f"=== @self.__app.route('/serial_device/plc/realtime_performance_speed', methods=['GET']) ===") return f"realtime_performance_speed: speed_kbps={speed_kbps} speed_kbps2={speed_kbps2}" @self.__app.route("/serial_device/plc/motor_mode", methods=['GET']) def serial_device_plc_motor_mode(): get_type: Optional[str] = request.args.get('type') mode_number: Optional[str] = request.args.get('mode') if get_type is not None and mode_number is not None: tester = self.__core.serial_controller.get_tester() use_msg = None if get_type == "motor": use_msg_list = tester.motor_msg_list if mode_number in ["1", "2", "3", "4"]: try: index = int(mode_number) - 1 use_msg = use_msg_list[index] except ValueError: return "failed" except IndexError: return "failed" elif get_type == "relay": use_msg_list = tester.relay_msg_list if mode_number in ["1", "2"]: try: index = int(mode_number) - 1 use_msg = use_msg_list[index] except ValueError: return "failed" except IndexError: return "failed" else: return "failed" if use_msg is not None: tester.set_circle_hook_to_msg_once_mode(use_msg) return "success" return "failed" @self.__app.route("/files/documents/start", methods=['GET']) def files_documents_start(): host, port = self.__core.local_file_service.start() return f"documents: host={host} port={port}" if __name__ == "__main__": server: Server = Server() server.start()