import random import threading import time from typing import Optional import Levenshtein import numpy as np from flask import jsonify, Flask, request, Response from flask_cors import CORS from program_public_tools import ProgramPublicTools from serial_com_service import SerialComService class SerialDevice(object): def __init__(self, pubtools: ProgramPublicTools): self.__pubtools: ProgramPublicTools = pubtools self.cnc_attenuator: CNCAttenuator = CNCAttenuator(self.__pubtools) self.plc_sender: PLCDeviceSender = PLCDeviceSender(self.__pubtools, "PLC-Sender") self.plc_receiver: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver") self.plc_receiver2: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver2") self.enable_multi_receiver: bool = False self.__enable_match_cnc: bool = False self.com_usr_config_success = False def match_connection(self, com_service: SerialComService, enable_auto_match_plc_device: bool = False) \ -> tuple[bool, bool]: matched: bool = False cnc_matched: bool = False if (self.cnc_attenuator.com is None) and (self.__enable_match_cnc is True): matched = self.cnc_attenuator.match_connection(com_service) if matched: cnc_matched = True if enable_auto_match_plc_device is True: if matched is False: if self.plc_sender.com is None: matched = self.plc_sender.match_connection(com_service) elif self.plc_receiver.com is None: matched = self.plc_receiver.match_connection(com_service) else: matched = False return matched, cnc_matched def check(self): if self.cnc_attenuator.com is not None: serial_device_list = ["CNC-Attenuator"] not_connect_list = [] else: serial_device_list = [] not_connect_list = ["CNC-Attenuator"] return serial_device_list, not_connect_list class SerialDeviceController(object): def __init__(self, pubtools: ProgramPublicTools): self.__pubtools: ProgramPublicTools = pubtools self.__port_list = SerialComService(self.__pubtools).port_scan() self.services: SerialDevice = SerialDevice(self.__pubtools) self.__baud_rate: int = 115200 self.__connection_lib: dict = {} self.__tester: Optional[TestService] = None def release_inter_source(self): del self.__tester for com_service in self.__connection_lib: com_service.port_disconnect() def get_tester(self): if self.__tester is None: self.__tester = TestService() return self.__tester def auto_connect(self) -> tuple[list[str], list[str], bool]: return self.auto_match(self.get_port_list()) def get_port_list(self) -> list: return self.__port_list def auto_match(self, device_port_list: list[str]) -> tuple[list[str], list[str], bool]: not_matched_port_list: list[str] = [] matched_port_list: list[str] = [] cnc_match_result: bool = False for port_name in device_port_list: com_service = self.try_build_connection(port_name) match_flag: bool = False if com_service is not None: if com_service.connection is not None: match_result, cnc_match_result = self.services.match_connection(com_service) if match_result is True: match_flag = True else: com_service.port_disconnect() if match_flag is True: matched_port_list.append(port_name) else: not_matched_port_list.append(port_name) return matched_port_list, not_matched_port_list, cnc_match_result def try_build_connection(self, port_name: str, if_add_source_to_lib: bool = False) -> Optional[SerialComService]: if port_name in self.__connection_lib: return self.__connection_lib[port_name] else: com_service: SerialComService = SerialComService(self.__pubtools) connection_result = False try: connection_result: bool = com_service.port_connect(port_name, self.__baud_rate) except Exception as e: self.__pubtools.debug_output(f"Error when attempting to connect port [{port_name}]: {e}") if connection_result is True: if if_add_source_to_lib is True: self.__connection_lib[port_name] = com_service return com_service else: return None class PLCDevice(object): def __init__(self, pubtools: ProgramPublicTools, name: str): self.pubtools: ProgramPublicTools = pubtools self.com: Optional[SerialComService] = None self.name: str = name self.frame_std: bytes = 200 * b'#123456789#X' + b'123456789#\n' def match_connection(self, com: Optional[SerialComService]) -> bool: self.com = com return True class PLCDeviceSender(PLCDevice): def __init__(self, pubtools: ProgramPublicTools, name: str): super().__init__(pubtools, name) self.send_pkg_number: int = 0 self.send_pkg_count: int = 0 self.send_pkg_length: int = len(self.frame_std) self.__s_server: FloatServer = FloatServer("Sender") self.__is_running: bool = False self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8393)[0] self.server_url = f"sender: http://localhost:{self.server_port}{self.__s_server.name}" self.__pause: bool = False self.sleep_sec_every_step: float = 0 self.sleep_sec_every_ten_step: float = 0.01 self.__ten_step_counter: int = 0 self.pull_up_signal_when_sending: bool = False self.__is_resetting: bool = False def clear(self): self.send_pkg_number = 0 self.send_pkg_count = 0 self.__ten_step_counter = 0 def keep_send(self): self.__pause = False if self.__is_running is not True: self.__start_server() self.__is_running = True return self.__return_server_info() def pause(self): self.__pause = True def pause_and_reset_frame_std_sender(self, frame_std_new): if self.__is_resetting is False: self.__is_resetting = True self.pause() self.frame_std = frame_std_new self.send_pkg_length = len(self.frame_std) self.__is_resetting = False @staticmethod def __start_server(): return None def __start_server_origin(self): threading.Thread(target=self.__send_thread_main).start() threading.Thread(target=self.__sender_server_threading_main, args=(self.server_port,)).start() print(self.server_url) return self.__return_server_info() @staticmethod def __return_server_info(): return None, None def __return_server_info_origin(self): return self.server_port, self.server_url def __sender_server_threading_main(self, port): self.__s_server.run("0.0.0.0", port) def __send_thread_main(self): if self.sleep_sec_every_step != 0: print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every message.") if self.sleep_sec_every_step != 0: print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every ten message.") while True: if self.__pause is False: time.sleep(self.sleep_sec_every_step) self.__ten_step_counter += 1 if self.__ten_step_counter == 10: self.__ten_step_counter = 0 time.sleep(self.sleep_sec_every_ten_step) self.__s_server.push(self.send_pkg_count) else: time.sleep(0.1) def send_once(self): print(f"WRITE {self.frame_std}") self.com.connection.write(self.frame_std) self.pull_up_signal_when_sending = True self.send_pkg_number += 1 self.send_pkg_count = self.send_pkg_number * self.send_pkg_length class PLCDeviceReceiver(PLCDevice): def __init__(self, pubtools: ProgramPublicTools, name: str): super().__init__(pubtools, name) self.error_count: int = 0 self.receive_pkg: int = 0 self.receive_bytes: int = 0 self.receive_bytes_this_circle: int = 0 self.__r_server: FloatServer = FloatServer("Receiver") self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8353)[0] self.server_url = f"receive: http://localhost:{self.server_port}{self.__r_server.name}" self.__is_running: bool = False self.__pause: bool = False self.pull_up_signal_when_receiving: bool = False self.auto_pause_when_receiving = False self.__update_frame_std: bool = False self.__is_resetting: bool = False def clear(self): self.error_count = 0 self.receive_pkg = 0 self.receive_bytes = 0 self.receive_bytes_this_circle: int = 0 def keep_receive(self): self.__pause = False if self.__is_running is not True: self.__start_server() self.__is_running = True return self.__return_info() def pause(self): self.__pause = True def __start_server(self): threading.Thread(target=self.__receive_thread_main).start() threading.Thread(target=self.__receiver_server_threading_main, args=(self.server_port,)).start() print(self.server_url) return self.__return_info() def __return_info(self): return self.server_port, self.server_url def __receiver_server_threading_main(self, port): self.__r_server.run("0.0.0.0", port) def pause_and_reset_frame_std_receiver(self, frame_std_new): if self.__is_resetting is False: self.__is_resetting = True self.frame_std = frame_std_new self.pause() self.__update_frame_std = True def __receive_thread_main(self): frame_std: bytes = self.frame_std frame_std_len: int = len(frame_std) while True: if self.com is not None: if self.__update_frame_std is True: frame_std: bytes = self.frame_std frame_std_len: int = len(frame_std) self.__update_frame_std = False self.__is_resetting = False if self.__pause is False: self.receive_bytes_this_circle = 0 line = self.com.connection.readline() if line: self.receive_pkg += 1 self.receive_bytes = self.receive_bytes + len(line) self.receive_bytes_this_circle = len(line) self.pull_up_signal_when_receiving = True get_error_bytes = self.calculate_ber_optimized(frame_std, line) if get_error_bytes > frame_std_len: self.error_count += frame_std_len else: self.error_count += get_error_bytes self.__r_server.push(self.error_count) if self.auto_pause_when_receiving is True: self.pause() else: time.sleep(0.1) else: time.sleep(0.1) @staticmethod def calculate_ber_optimized(pkg_send, pkg_receive): assert isinstance(pkg_send, bytes) and isinstance(pkg_receive, bytes), "Input should be bytes" assert len(pkg_send) > 0 and len(pkg_receive) > 0, "Input bytes should not be empty" min_length = min(len(pkg_send), len(pkg_receive)) bit_errors = 0 for send_byte, receive_byte in zip(pkg_send[:min_length], pkg_receive[:min_length]): if send_byte != receive_byte: send_bits = np.unpackbits(np.array([send_byte], dtype=np.uint8)) receive_bits = np.unpackbits(np.array([receive_byte], dtype=np.uint8)) bit_errors += np.sum(send_bits != receive_bits) return np.int_(bit_errors / 8) class CNCAttenuator(object): def __init__(self, pubtools: ProgramPublicTools): self.__pubtools: ProgramPublicTools = pubtools self.com: Optional[SerialComService] = None self.name: str = "CNC-Attenuator" def match_connection(self, com: SerialComService) -> bool: for _ in range(0, 2): com.connection.write(b'att-000.00\r\n') com.connection.timeout = 1 data = com.connection.readline().decode('utf-8') if data.lower() == "attOK".lower(): self.com = com self.__pubtools.debug_output("Successfully connect to CNC-Attenuator.") return True return False def set(self, num: float) -> bool: if self.com is None: return False else: try: formatted_num_str = "{:06.2f}".format(num) formatted_byte_str = b'att-' + formatted_num_str.encode('utf-8') + b'\r\n' self.com.connection.write(formatted_byte_str) except Exception as e: return False else: return True class FloatServer: def __init__(self, name: Optional[str] = None): self.name: Optional[str] = name self.__app = Flask(__name__) CORS(self.__app) self.__value = 0.0 self.__instrument_on = False # 新增的仪器状态标志 self.__setup_routes() def __setup_routes(self): if self.name is None: self.name = f"/float_default_{time.time()}" else: self.name = f"/{self.name}" @self.__app.route(self.name, methods=['GET']) def get_value(): return jsonify({'value': self.__value}) # 新增的端点来获取仪器的状态 @self.__app.route(f"{self.name}/instrument_status", methods=['GET']) def get_instrument_status(): return jsonify({'status': self.__instrument_on}) # 新增的端点来更改仪器的状态 @self.__app.route(f"{self.name}/set_instrument_status", methods=['POST']) def set_instrument_status(): status = request.json.get('status', None) if status is not None: self.__instrument_on = status return Response("Instrument status updated", status=200) else: return Response("Invalid request format", status=400) def run(self, host, port): self.__app.run(host=host, port=port) def push(self, new_value): self.__value = new_value class TestService(object): def __init__(self): self.config_sleep_sec_every_step: float = 0 self.config_sleep_sec_every_ten_step: float = 0 self.error_rate_percent: float = 0 self.error_rate_percent2: float = 0 self.__is_tester_running: bool = False self.__threading = None self.__sender = None self.__receiver = None self.__receiver2 = None self.__task_reset_error_rate_percent: bool = False self.__speed_timer: float = 0 self.speed_kbps: float = 0 self.speed_kbps2: float = 0 self.__rev_timer_sec: float = time.time() self.__rev_timeout_sec: float = 2 self.__is_resetting_frame_std_and_restarting: bool = False self.send_package_1kbit: bytes = 25 * b'12345' + b'\n' self.send_package_16kbit: bytes = 400 * b'12345' + b'\n' self.test_service_init_complete_flag: bool = False self.have_test_service_task_in_circle: bool = False self.have_task_reset_frame_std: bool = False self.task_lock_reset_frame_std_and_restart: bool = False self.__frame_std_new: bytes = self.send_package_16kbit self.__nullable_circle_completed_hook = None self.motor_msg_list: list[bytes] = [bytes.fromhex('FF FF FE 07 03 2A 00 00 03 E8 E2'), bytes.fromhex('FF FF FE 07 03 2A 02 00 03 E8 E0'), bytes.fromhex('FF FF FE 07 03 2A 04 00 03 E8 DE'), bytes.fromhex('FF FF FE 07 03 2A 08 00 03 E8 DA')] self.relay_msg_list: list[bytes] = [bytes.fromhex('68 09 00 FF 12 00 01 12 16'), bytes.fromhex('68 09 00 FF 12 00 00 11 16')] self.__motor_and_relay_msg_list: list[bytes] = self.motor_msg_list + self.relay_msg_list self.__motor_and_relay_msg_list_length: int = len(self.__motor_and_relay_msg_list) self.__default_msg_queue: list[bytes] = self.motor_msg_list self.__using_msg_queue: list[bytes] = self.__default_msg_queue self.__using_msg_queue_counter_index: int = 0 self.__using_msg_queue_counter_index_max: int = 0 self.__msg_convert_interval_time_sec: float = 0.8 self.__circle_hook_init: bool = False self.__pause_receive_under_hook_mode: bool = False self.__msg_once_send_buffer: Optional[bytes] = None self.__msg_once_send_completed: bool = False self.__buffer_index_set_circle_hook_to_msg_once_mode = 0 def set_circle_hook_to_msg_once_mode(self, msg: Optional[bytes] = None): if msg is None: range_num_min: int = 1 range_num_max: int = self.__motor_and_relay_msg_list_length while True: get_number: int = random.randint(range_num_min, range_num_max) get_index: int = get_number - 1 if get_index != self.__buffer_index_set_circle_hook_to_msg_once_mode: break self.__buffer_index_set_circle_hook_to_msg_once_mode = get_index msg = self.__motor_and_relay_msg_list[get_index] print(f"[Hook] MSG is None: Using random index [{get_index}]: {msg}") self.__msg_once_send_buffer = msg self.__msg_once_send_completed = False self.__set_nullable_circle_completed_hook(self.__circle_hook_method_msg_once_mode) self.__pause_receive_under_hook_mode = True def __circle_hook_method_msg_once_mode(self): # time.sleep(self.__msg_convert_interval_time_sec) if self.__msg_once_send_completed is False: if self.__msg_once_send_buffer is not None: self.reset_frame_std_and_restart(self.__msg_once_send_buffer) else: print("[Hook] Warning: Flag completed is false but buffer is None.") self.__msg_once_send_completed = True else: self.__msg_once_send_buffer = None self.reset_circle_hook_to_none() def set_circle_hook_to_using_msg_queue(self, msg_queue: Optional[list[bytes]] = None): if msg_queue is not None: self.__using_msg_queue = msg_queue else: self.__using_msg_queue = self.__default_msg_queue self.__set_nullable_circle_completed_hook(self.__circle_hook_method_using_msg_queue) self.__pause_receive_under_hook_mode = True def __circle_hook_method_using_msg_queue(self): time.sleep(self.__msg_convert_interval_time_sec) if self.__circle_hook_init is False: self.__using_msg_queue_counter_index_current = 0 self.__using_msg_queue_counter_index_max = len(self.__using_msg_queue) - 1 self.__circle_hook_init = True self.reset_frame_std_and_restart(self.__using_msg_queue[self.__using_msg_queue_counter_index_current]) if self.__using_msg_queue_counter_index_current == self.__using_msg_queue_counter_index_max: self.__using_msg_queue_counter_index_current = 0 else: self.__using_msg_queue_counter_index_current += 1 def reset_circle_hook_to_none(self): self.__set_nullable_circle_completed_hook(None) self.__pause_receive_under_hook_mode = False def __set_nullable_circle_completed_hook(self, func_nullable_circle_completed_hook): self.__circle_hook_init = False self.__nullable_circle_completed_hook = func_nullable_circle_completed_hook def run_as_main(self): run_mode = "" while True: print("Run Mode: A: PLC-Tester B: CNC-Controller") input_mode = input("Choose a Mode: ").lower() if input_mode == "a": run_mode = input_mode break elif input_mode == "b": run_mode = input_mode break else: print("Invalid mode input! Try input again.") if run_mode == "a": self.run_as_plc_tester() elif run_mode == "b": self.run_as_cnc_controller() else: raise Exception("Error: Unknown type of run mode.") def run_as_cnc_controller(self): program_pubtools: ProgramPublicTools = ProgramPublicTools() serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools) port_list = serial_device_controller.get_port_list() port_info: str = '' index_num: int = 0 for port_name in port_list: port_info = port_info + f"Index[{index_num}]={port_name} " index_num += 1 print(port_info) index = input("[CNC Controller] index: ") cnc_controller: SerialComService = SerialComService(program_pubtools) cnc_controller.port_connect(port_list[int(index)]) result: bool = serial_device_controller.services.cnc_attenuator.match_connection(cnc_controller) if result is False: raise Exception("[CNC Controller] Error: Cannot connect to cnc device.") else: while True: get_value_input = input("Input to set cnc value: (dB) ") try: set_value = float(get_value_input) except ValueError: print("Invalid input! CNC value is under float format. Please enter a float number.") else: serial_device_controller.services.cnc_attenuator.set(set_value) print("Set success!") input("Input any content to continue...") def run_as_plc_tester(self): program_pubtools: ProgramPublicTools = ProgramPublicTools() serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools) # serial_device_controller.auto_connect() port_list = serial_device_controller.get_port_list() port_info: str = '' index_num: int = 0 for port_name in port_list: port_info = port_info + f"Index[{index_num}]={port_name} " index_num += 1 print(port_info) index_1 = input("[Sender] index: ") index_2 = input("[Receiver] index: ") index_3 = input("[Receiver2] index: (Optional) ") com_sender: SerialComService = SerialComService(program_pubtools) com_sender.port_connect(port_list[int(index_1)]) com_receiver: SerialComService = SerialComService(program_pubtools) com_receiver.port_connect(port_list[int(index_2)]) com_receiver2: Optional[SerialComService] = None if index_3: print(f"[Serial Controller] Multi Receiver Mode is enabled. Receiver2: {port_list[int(index_3)]}.") com_receiver2 = SerialComService(program_pubtools) com_receiver2.port_connect(port_list[int(index_3)]) serial_device_controller.services.plc_sender.match_connection(com_sender) serial_device_controller.services.plc_receiver.match_connection(com_receiver) serial_device_controller.services.plc_receiver2.match_connection(com_receiver2) while True: print("Please config interval seconds for every message.") get_sec = input("[One Message] Seconds: float = ") try: float_sec = float(get_sec) break except ValueError: print("Invalid input! Seconds config is under float format. Please enter a float number.") while True: print("Please config interval seconds for every ten messages.") get_sec_ten = input("[Ten Messages] Seconds: float = ") try: float_sec_ten = float(get_sec_ten) break except ValueError: print("Invalid input! Seconds config is under float format. Please enter a float number.") serial_device_controller.services.plc_sender.sleep_sec_every_step = float_sec serial_device_controller.services.plc_sender.sleep_sec_every_ten_step = float_sec_ten serial_device_controller.services.plc_receiver.keep_receive() if com_receiver2: serial_device_controller.services.plc_receiver2.keep_receive() self.start_test_service(serial_device_controller.services.plc_sender, serial_device_controller.services.plc_receiver, serial_device_controller.services.plc_receiver2) else: self.start_test_service(serial_device_controller.services.plc_sender, serial_device_controller.services.plc_receiver, None) while True: get_input: str = input() if get_input: if get_input == "b": self.set_circle_hook_to_msg_once_mode() if get_input == "a": if self.__nullable_circle_completed_hook is None: self.set_circle_hook_to_using_msg_queue() else: self.reset_circle_hook_to_none() def task_add_reset_error_rate(self): self.__task_reset_error_rate_percent = True def __do_reset_error_rate(self): if self.__sender and self.__receiver: self.__sender.clear() self.__receiver.clear() if self.__receiver2: self.__receiver2.clear() self.__speed_timer = time.time() def error_rate_push(self, error_rate_percent: float, receiver_id: int = 1): if receiver_id == 2: self.error_rate_percent2 = error_rate_percent else: self.error_rate_percent = error_rate_percent def start_test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver, receiver2: Optional[PLCDeviceReceiver] = None): self.__pause_tester_signal = False if self.__is_tester_running is False: self.__start_tester_threading(sender, receiver, receiver2) self.__is_tester_running = True else: self.__pause_tester_signal = True if receiver2 is None: print("[ ---------- Test Service Start: Receiver2 is None ---------- ]") else: print("[ ---------- Test Service Start: Receiver2 is not None ---------- ]") self.__receiver2 = receiver2 self.__pause_tester_signal = False def pause_test_service(self): self.__pause_tester_signal = True def reset_frame_std_and_restart(self, frame_std_new: bytes): if self.task_lock_reset_frame_std_and_restart is False: self.task_lock_reset_frame_std_and_restart = True self.__frame_std_new = frame_std_new self.have_test_service_task_in_circle = True self.have_task_reset_frame_std = True def __do_reset_frame_std_and_restart(self): if self.__is_resetting_frame_std_and_restarting is False: self.__is_resetting_frame_std_and_restarting = True self.pause_test_service() self.__sender.pause_and_reset_frame_std_sender(self.__frame_std_new) self.__receiver.pause_and_reset_frame_std_receiver(self.__frame_std_new) if self.__receiver2 is not None: self.__receiver2.pause_and_reset_frame_std_receiver(self.__frame_std_new) self.task_add_reset_error_rate() self.start_test_service(self.__sender, self.__receiver, self.__receiver2) self.__is_resetting_frame_std_and_restarting = False def __test_service_task_in_circle(self): if self.have_task_reset_frame_std is True: self.__do_reset_frame_std_and_restart() self.have_task_reset_frame_std = False self.task_lock_reset_frame_std_and_restart = False def __start_tester_threading(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver, receiver2: Optional[PLCDeviceReceiver] = None): self.__threading = threading.Thread(target=self.test_service, args=(sender, receiver, receiver2)).start() def __check_task_reset_error_rate(self): if self.__task_reset_error_rate_percent is True: self.__do_reset_error_rate() self.__task_reset_error_rate_percent = False def update_error_rate(self, receiver_number: int = None): if receiver_number == 2: receiver = self.__receiver2 else: receiver = self.__receiver self.__check_task_reset_error_rate() if (self.__sender is not None) and (receiver is not None): count_all = self.__sender.send_pkg_count get_error = receiver.error_count pkg_lose_bytes = (self.__sender.send_pkg_number - receiver.receive_pkg) * self.__sender.send_pkg_length count_error_all = pkg_lose_bytes + get_error if count_all != 0: error_rate_percent: float = 100 * count_error_all / count_all else: error_rate_percent: float = 0.0 self.error_rate_push(error_rate_percent, receiver_number) self.__update_speed(count_all, pkg_lose_bytes, receiver_number) return count_all, pkg_lose_bytes, count_error_all else: return 0, 0, 0 def __update_speed(self, count_all_bytes, pkg_lose_bytes, receiver_number: int = None): get_time = time.time() - self.__speed_timer if get_time != 0: speed_kbps = (count_all_bytes - pkg_lose_bytes) * 8 / (1000 * get_time) else: speed_kbps = 0 if speed_kbps >= 60: speed_kbps = speed_kbps / 2 if receiver_number == 2: self.speed_kbps2 = speed_kbps else: self.speed_kbps = speed_kbps def test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver, receiver2: Optional[PLCDeviceReceiver] = None): self.__sender = sender self.__receiver = receiver self.__receiver2 = receiver2 self.__sender.sleep_sec_every_step = self.config_sleep_sec_every_step self.__sender.sleep_sec_every_ten_step = self.config_sleep_sec_every_ten_step self.__receiver.auto_pause_when_receiving = True if self.__receiver2: self.__receiver2.auto_pause_when_receiving = True self.__receiver.keep_receive() if self.__receiver2: self.__receiver2.keep_receive() self.__sender.send_once() self.__rev_timer_sec = time.time() self.__rev_timeout_sec = 2 self.__speed_timer = time.time() for _ in range(0, 3): self.__test_circle(if_print=False) self.task_add_reset_error_rate() self.test_service_init_complete_flag = True while True: if self.have_test_service_task_in_circle is True: self.__test_service_task_in_circle() self.have_test_service_task_in_circle = False if self.__pause_tester_signal is False: if self.__pause_receive_under_hook_mode is False: self.__test_circle(if_print=True) else: print("[Hook] Program is running under hook mode.") self.__sender.send_once() if self.__nullable_circle_completed_hook is not None: print("[Hook] Do hook func...") self.__nullable_circle_completed_hook() else: print("[Hook] Warning: Hook func is empty.") time.sleep(0.1) else: time.sleep(0.1) @staticmethod def __tester_output(message: str, if_print: bool): if if_print is True: print(message) def __test_circle(self, if_print: bool = True): rev_flag_1 = self.__receiver.pull_up_signal_when_receiving if self.__receiver2 is None: rev_flag_2 = False else: rev_flag_2 = self.__receiver2.pull_up_signal_when_receiving if (self.__receiver2 is None) and (rev_flag_1 is True): count_all, pkg_lose_bytes, count_error = self.update_error_rate(1) self.__tester_output("====================================================", if_print) self.__tester_output( f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print) self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print) self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}", if_print) self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print) if self.__sender.send_pkg_number % 5 == 0: self.__tester_output(f"[Sender] Package Length: {len(self.__sender.frame_std)}", if_print) self.__tester_output(f"[Receiver] Package Length: {len(self.__receiver.frame_std)}", if_print) self.__receiver.pull_up_signal_when_receiving = False time.sleep(self.__sender.sleep_sec_every_step) if self.__sender.send_pkg_number % 10 == 0: time.sleep(self.__sender.sleep_sec_every_ten_step) self.__receiver.keep_receive() self.__sender.send_once() self.__rev_timer_sec = time.time() elif (self.__receiver2 is not None) and (rev_flag_1 is True) and (rev_flag_2 is True): count_all, pkg_lose_bytes, count_error = self.update_error_rate(1) count_all2, pkg_lose_bytes2, count_error2 = self.update_error_rate(2) self.__tester_output("====================================================", if_print) self.__tester_output(f"Receiver [1]:", if_print) self.__tester_output( f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print) self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print) self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}", if_print) self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print) self.__tester_output(f"Receiver [2]:", if_print) self.__tester_output( f"all: {count_all2 * 8} bit, lose: {pkg_lose_bytes2 * 8} bit, error: {count_error2 * 8} bit", if_print) self.__tester_output(f"error_rate: {self.error_rate_percent2} %", if_print) self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver2.receive_pkg}", if_print) self.__tester_output(f"speed: {self.speed_kbps2} kbps", if_print) self.__receiver.pull_up_signal_when_receiving = False time.sleep(self.__sender.sleep_sec_every_step) if self.__sender.send_pkg_number % 10 == 0: time.sleep(self.__sender.sleep_sec_every_ten_step) self.__receiver.keep_receive() self.__receiver2.keep_receive() self.__sender.send_once() self.__rev_timer_sec = time.time() elif time.time() - self.__rev_timer_sec >= self.__rev_timeout_sec: self.__tester_output("====================================================", if_print) self.__tester_output("[Serial Controller] Warning: Receiver timeout. A package lost.", if_print) self.update_error_rate(1) self.__tester_output(f"[Serial Controller] Warning: error_rate: {self.error_rate_percent} %", if_print) if self.__receiver2: self.update_error_rate(2) self.__tester_output( f"[Serial Controller] Warning: error_rate: {self.error_rate_percent2} % (Receiver2)", if_print) self.__receiver.pull_up_signal_when_receiving = False self.__receiver.keep_receive() if self.__receiver2: self.__receiver2.keep_receive() self.__sender.send_once() self.__rev_timer_sec = time.time() else: time.sleep(0.1) if __name__ == "__main__": main_tester: TestService = TestService() main_tester.run_as_main()