|
- import random
- import threading
- import time
- from typing import Optional
- import Levenshtein
- import numpy as np
- from flask import jsonify, Flask, request, Response
- from flask_cors import CORS
- from program_public_tools import ProgramPublicTools
- from serial_com_service import SerialComService
- class SerialDevice(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.__pubtools: ProgramPublicTools = pubtools
- self.cnc_attenuator: CNCAttenuator = CNCAttenuator(self.__pubtools)
- self.plc_sender: PLCDeviceSender = PLCDeviceSender(self.__pubtools, "PLC-Sender")
- self.plc_receiver: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver")
- self.plc_receiver2: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver2")
- self.enable_multi_receiver: bool = False
- self.__enable_match_cnc: bool = False
- self.com_usr_config_success = False
- def match_connection(self, com_service: SerialComService, enable_auto_match_plc_device: bool = False) \
- -> tuple[bool, bool]:
- matched: bool = False
- cnc_matched: bool = False
- if (self.cnc_attenuator.com is None) and (self.__enable_match_cnc is True):
- matched = self.cnc_attenuator.match_connection(com_service)
- if matched:
- cnc_matched = True
- if enable_auto_match_plc_device is True:
- if matched is False:
- if self.plc_sender.com is None:
- matched = self.plc_sender.match_connection(com_service)
- elif self.plc_receiver.com is None:
- matched = self.plc_receiver.match_connection(com_service)
- else:
- matched = False
- return matched, cnc_matched
- def check(self):
- if self.cnc_attenuator.com is not None:
- serial_device_list = ["CNC-Attenuator"]
- not_connect_list = []
- else:
- serial_device_list = []
- not_connect_list = ["CNC-Attenuator"]
- return serial_device_list, not_connect_list
- class SerialDeviceController(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.__pubtools: ProgramPublicTools = pubtools
- self.__port_list = SerialComService(self.__pubtools).port_scan()
- self.services: SerialDevice = SerialDevice(self.__pubtools)
- self.__baud_rate: int = 115200
- self.__connection_lib: dict = {}
- self.__tester: Optional[TestService] = None
- def get_tester(self):
- if self.__tester is None:
- self.__tester = TestService()
- return self.__tester
- def auto_connect(self) -> tuple[list[str], list[str], bool]:
- return self.auto_match(self.get_port_list())
- def get_port_list(self) -> list:
- return self.__port_list
- def auto_match(self, device_port_list: list[str]) -> tuple[list[str], list[str], bool]:
- not_matched_port_list: list[str] = []
- matched_port_list: list[str] = []
- cnc_match_result: bool = False
- for port_name in device_port_list:
- com_service = self.try_build_connection(port_name)
- match_flag: bool = False
- if com_service is not None:
- if com_service.connection is not None:
- match_result, cnc_match_result = self.services.match_connection(com_service)
- if match_result is True:
- match_flag = True
- else:
- com_service.port_disconnect()
- if match_flag is True:
- matched_port_list.append(port_name)
- else:
- not_matched_port_list.append(port_name)
- return matched_port_list, not_matched_port_list, cnc_match_result
- def try_build_connection(self, port_name: str, if_add_source_to_lib: bool = False) -> Optional[SerialComService]:
- if port_name in self.__connection_lib:
- return self.__connection_lib[port_name]
- else:
- com_service: SerialComService = SerialComService(self.__pubtools)
- connection_result = False
- try:
- connection_result: bool = com_service.port_connect(port_name, self.__baud_rate)
- except Exception as e:
- self.__pubtools.debug_output(f"Error when attempting to connect port [{port_name}]: {e}")
- if connection_result is True:
- if if_add_source_to_lib is True:
- self.__connection_lib[port_name] = com_service
- return com_service
- else:
- return None
- class PLCDevice(object):
- def __init__(self, pubtools: ProgramPublicTools, name: str):
- self.pubtools: ProgramPublicTools = pubtools
- self.com: Optional[SerialComService] = None
- self.name: str = name
- self.frame_std: bytes = 200 * b'#123456789#X' + b'123456789#\n'
- def match_connection(self, com: SerialComService) -> bool:
- self.com = com
- return True
- class PLCDeviceSender(PLCDevice):
- def __init__(self, pubtools: ProgramPublicTools, name: str):
- super().__init__(pubtools, name)
- self.send_pkg_number: int = 0
- self.send_pkg_count: int = 0
- self.send_pkg_length: int = len(self.frame_std)
- self.__s_server: FloatServer = FloatServer("Sender")
- self.__is_running: bool = False
- self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8393)[0]
- self.server_url = f"sender: http://localhost:{self.server_port}{self.__s_server.name}"
- self.__pause: bool = False
- self.sleep_sec_every_step: float = 0
- self.sleep_sec_every_ten_step: float = 0.01
- self.__ten_step_counter: int = 0
- self.pull_up_signal_when_sending: bool = False
- self.__is_resetting: bool = False
- def clear(self):
- self.send_pkg_number = 0
- self.send_pkg_count = 0
- self.__ten_step_counter = 0
- def keep_send(self):
- self.__pause = False
- if self.__is_running is not True:
- self.__start_server()
- self.__is_running = True
- return self.__return_server_info()
- def pause(self):
- self.__pause = True
- def pause_and_reset_frame_std_sender(self, frame_std_new):
- if self.__is_resetting is False:
- self.__is_resetting = True
- self.pause()
- self.frame_std = frame_std_new
- self.send_pkg_length = len(self.frame_std)
- self.__is_resetting = False
- @staticmethod
- def __start_server():
- return None
- def __start_server_origin(self):
- threading.Thread(target=self.__send_thread_main).start()
- threading.Thread(target=self.__sender_server_threading_main, args=(self.server_port,)).start()
- print(self.server_url)
- return self.__return_server_info()
- @staticmethod
- def __return_server_info():
- return None, None
- def __return_server_info_origin(self):
- return self.server_port, self.server_url
- def __sender_server_threading_main(self, port):
- self.__s_server.run("0.0.0.0", port)
- def __send_thread_main(self):
- if self.sleep_sec_every_step != 0:
- print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every message.")
- if self.sleep_sec_every_step != 0:
- print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every ten message.")
- while True:
- if self.__pause is False:
- time.sleep(self.sleep_sec_every_step)
- self.__ten_step_counter += 1
- if self.__ten_step_counter == 10:
- self.__ten_step_counter = 0
- time.sleep(self.sleep_sec_every_ten_step)
- self.__s_server.push(self.send_pkg_count)
- else:
- time.sleep(0.1)
- def send_once(self):
- print(f"WRITE {self.frame_std}")
- self.com.connection.write(self.frame_std)
- self.pull_up_signal_when_sending = True
- self.send_pkg_number += 1
- self.send_pkg_count = self.send_pkg_number * self.send_pkg_length
- class PLCDeviceReceiver(PLCDevice):
- def __init__(self, pubtools: ProgramPublicTools, name: str):
- super().__init__(pubtools, name)
- self.error_count: int = 0
- self.receive_pkg: int = 0
- self.receive_bytes: int = 0
- self.receive_bytes_this_circle: int = 0
- self.__r_server: FloatServer = FloatServer("Receiver")
- self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8353)[0]
- self.server_url = f"receive: http://localhost:{self.server_port}{self.__r_server.name}"
- self.__is_running: bool = False
- self.__pause: bool = False
- self.pull_up_signal_when_receiving: bool = False
- self.auto_pause_when_receiving = False
- self.__update_frame_std: bool = False
- self.__is_resetting: bool = False
- def clear(self):
- self.error_count = 0
- self.receive_pkg = 0
- self.receive_bytes = 0
- self.receive_bytes_this_circle: int = 0
- def keep_receive(self):
- self.__pause = False
- if self.__is_running is not True:
- self.__start_server()
- self.__is_running = True
- return self.__return_info()
- def pause(self):
- self.__pause = True
- def __start_server(self):
- threading.Thread(target=self.__receive_thread_main).start()
- threading.Thread(target=self.__receiver_server_threading_main, args=(self.server_port,)).start()
- print(self.server_url)
- return self.__return_info()
- def __return_info(self):
- return self.server_port, self.server_url
- def __receiver_server_threading_main(self, port):
- self.__r_server.run("0.0.0.0", port)
- def pause_and_reset_frame_std_receiver(self, frame_std_new):
- if self.__is_resetting is False:
- self.__is_resetting = True
- self.frame_std = frame_std_new
- self.pause()
- self.__update_frame_std = True
- def __receive_thread_main(self):
- frame_std: bytes = self.frame_std
- frame_std_len: int = len(frame_std)
- while True:
- if self.__update_frame_std is True:
- frame_std: bytes = self.frame_std
- frame_std_len: int = len(frame_std)
- self.__update_frame_std = False
- self.__is_resetting = False
- if self.__pause is False:
- self.receive_bytes_this_circle = 0
- line = self.com.connection.readline()
- if line:
- self.receive_pkg += 1
- self.receive_bytes = self.receive_bytes + len(line)
- self.receive_bytes_this_circle = len(line)
- self.pull_up_signal_when_receiving = True
- get_error_bytes = self.calculate_ber_optimized(frame_std, line)
- if get_error_bytes > frame_std_len:
- self.error_count += frame_std_len
- else:
- self.error_count += get_error_bytes
- self.__r_server.push(self.error_count)
- if self.auto_pause_when_receiving is True:
- self.pause()
- else:
- time.sleep(0.1)
- @staticmethod
- def calculate_ber_optimized(pkg_send, pkg_receive):
- assert isinstance(pkg_send, bytes) and isinstance(pkg_receive, bytes), "Input should be bytes"
- assert len(pkg_send) > 0 and len(pkg_receive) > 0, "Input bytes should not be empty"
- min_length = min(len(pkg_send), len(pkg_receive))
- bit_errors = 0
- for send_byte, receive_byte in zip(pkg_send[:min_length], pkg_receive[:min_length]):
- if send_byte != receive_byte:
- send_bits = np.unpackbits(np.array([send_byte], dtype=np.uint8))
- receive_bits = np.unpackbits(np.array([receive_byte], dtype=np.uint8))
- bit_errors += np.sum(send_bits != receive_bits)
- return np.int_(bit_errors / 8)
- class CNCAttenuator(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.__pubtools: ProgramPublicTools = pubtools
- self.com: Optional[SerialComService] = None
- self.name: str = "CNC-Attenuator"
- def match_connection(self, com: SerialComService) -> bool:
- for _ in range(0, 2):
- com.connection.write(b'att-000.00\r\n')
- com.connection.timeout = 1
- data = com.connection.readline().decode('utf-8')
- if data.lower() == "attOK".lower():
- self.com = com
- self.__pubtools.debug_output("Successfully connect to CNC-Attenuator.")
- return True
- return False
- def set(self, num: float) -> bool:
- if self.com is None:
- return False
- else:
- try:
- formatted_num_str = "{:06.2f}".format(num)
- formatted_byte_str = b'att-' + formatted_num_str.encode('utf-8') + b'\r\n'
- self.com.connection.write(formatted_byte_str)
- except Exception as e:
- return False
- else:
- return True
- class FloatServer:
- def __init__(self, name: Optional[str] = None):
- self.name: Optional[str] = name
- self.__app = Flask(__name__)
- CORS(self.__app)
- self.__value = 0.0
- self.__instrument_on = False # 新增的仪器状态标志
- self.__setup_routes()
- def __setup_routes(self):
- if self.name is None:
- self.name = f"/float_default_{time.time()}"
- else:
- self.name = f"/{self.name}"
- @self.__app.route(self.name, methods=['GET'])
- def get_value():
- return jsonify({'value': self.__value})
- # 新增的端点来获取仪器的状态
- @self.__app.route(f"{self.name}/instrument_status", methods=['GET'])
- def get_instrument_status():
- return jsonify({'status': self.__instrument_on})
- # 新增的端点来更改仪器的状态
- @self.__app.route(f"{self.name}/set_instrument_status", methods=['POST'])
- def set_instrument_status():
- status = request.json.get('status', None)
- if status is not None:
- self.__instrument_on = status
- return Response("Instrument status updated", status=200)
- else:
- return Response("Invalid request format", status=400)
- def run(self, host, port):
- self.__app.run(host=host, port=port)
- def push(self, new_value):
- self.__value = new_value
- class TestService(object):
- def __init__(self):
- self.config_sleep_sec_every_step: float = 0
- self.config_sleep_sec_every_ten_step: float = 0
- self.error_rate_percent: float = 0
- self.error_rate_percent2: float = 0
- self.__is_tester_running: bool = False
- self.__threading = None
- self.__sender = None
- self.__receiver = None
- self.__receiver2 = None
- self.__task_reset_error_rate_percent: bool = False
- self.__speed_timer: float = 0
- self.speed_kbps: float = 0
- self.speed_kbps2: float = 0
- self.__rev_timer_sec: float = time.time()
- self.__rev_timeout_sec: float = 2
- self.__is_resetting_frame_std_and_restarting: bool = False
- self.send_package_1kbit: bytes = 25 * b'12345' + b'\n'
- self.send_package_16kbit: bytes = 400 * b'12345' + b'\n'
- self.test_service_init_complete_flag: bool = False
- self.have_test_service_task_in_circle: bool = False
- self.have_task_reset_frame_std: bool = False
- self.task_lock_reset_frame_std_and_restart: bool = False
- self.__frame_std_new: bytes = self.send_package_16kbit
- self.__nullable_circle_completed_hook = None
- self.motor_msg_list: list[bytes] = [bytes.fromhex('FF FF FE 07 03 2A 00 00 03 E8 E2'),
- bytes.fromhex('FF FF FE 07 03 2A 02 00 03 E8 E0'),
- bytes.fromhex('FF FF FE 07 03 2A 04 00 03 E8 DE'),
- bytes.fromhex('FF FF FE 07 03 2A 08 00 03 E8 DA')]
- self.relay_msg_list: list[bytes] = [bytes.fromhex('68 09 00 FF 12 00 01 12 16'),
- bytes.fromhex('68 09 00 FF 12 00 00 11 16')]
- self.__motor_and_relay_msg_list: list[bytes] = self.motor_msg_list + self.relay_msg_list
- self.__motor_and_relay_msg_list_length: int = len(self.__motor_and_relay_msg_list)
- self.__default_msg_queue: list[bytes] = self.motor_msg_list
- self.__using_msg_queue: list[bytes] = self.__default_msg_queue
- self.__using_msg_queue_counter_index: int = 0
- self.__using_msg_queue_counter_index_max: int = 0
- self.__msg_convert_interval_time_sec: float = 0.8
- self.__circle_hook_init: bool = False
- self.__pause_receive_under_hook_mode: bool = False
- self.__msg_once_send_buffer: Optional[bytes] = None
- self.__msg_once_send_completed: bool = False
- self.__buffer_index_set_circle_hook_to_msg_once_mode = 0
- def set_circle_hook_to_msg_once_mode(self, msg: Optional[bytes] = None):
- if msg is None:
- range_num_min: int = 1
- range_num_max: int = self.__motor_and_relay_msg_list_length
- while True:
- get_number: int = random.randint(range_num_min, range_num_max)
- get_index: int = get_number - 1
- if get_index != self.__buffer_index_set_circle_hook_to_msg_once_mode:
- break
- self.__buffer_index_set_circle_hook_to_msg_once_mode = get_index
- msg = self.__motor_and_relay_msg_list[get_index]
- print(f"[Hook] MSG is None: Using random index [{get_index}]: {msg}")
- self.__msg_once_send_buffer = msg
- self.__msg_once_send_completed = False
- self.__set_nullable_circle_completed_hook(self.__circle_hook_method_msg_once_mode)
- self.__pause_receive_under_hook_mode = True
- def __circle_hook_method_msg_once_mode(self):
- # time.sleep(self.__msg_convert_interval_time_sec)
- if self.__msg_once_send_completed is False:
- if self.__msg_once_send_buffer is not None:
- self.reset_frame_std_and_restart(self.__msg_once_send_buffer)
- else:
- print("[Hook] Warning: Flag completed is false but buffer is None.")
- self.__msg_once_send_completed = True
- else:
- self.__msg_once_send_buffer = None
- self.reset_circle_hook_to_none()
- def set_circle_hook_to_using_msg_queue(self, msg_queue: Optional[list[bytes]] = None):
- if msg_queue is not None:
- self.__using_msg_queue = msg_queue
- else:
- self.__using_msg_queue = self.__default_msg_queue
- self.__set_nullable_circle_completed_hook(self.__circle_hook_method_using_msg_queue)
- self.__pause_receive_under_hook_mode = True
- def __circle_hook_method_using_msg_queue(self):
- time.sleep(self.__msg_convert_interval_time_sec)
- if self.__circle_hook_init is False:
- self.__using_msg_queue_counter_index_current = 0
- self.__using_msg_queue_counter_index_max = len(self.__using_msg_queue) - 1
- self.__circle_hook_init = True
- self.reset_frame_std_and_restart(self.__using_msg_queue[self.__using_msg_queue_counter_index_current])
- if self.__using_msg_queue_counter_index_current == self.__using_msg_queue_counter_index_max:
- self.__using_msg_queue_counter_index_current = 0
- else:
- self.__using_msg_queue_counter_index_current += 1
- def reset_circle_hook_to_none(self):
- self.__set_nullable_circle_completed_hook(None)
- self.__pause_receive_under_hook_mode = False
- def __set_nullable_circle_completed_hook(self, func_nullable_circle_completed_hook):
- self.__circle_hook_init = False
- self.__nullable_circle_completed_hook = func_nullable_circle_completed_hook
- def run_as_main(self):
- run_mode = ""
- while True:
- print("Run Mode: A: PLC-Tester B: CNC-Controller")
- input_mode = input("Choose a Mode: ").lower()
- if input_mode == "a":
- run_mode = input_mode
- break
- elif input_mode == "b":
- run_mode = input_mode
- break
- else:
- print("Invalid mode input! Try input again.")
- if run_mode == "a":
- self.run_as_plc_tester()
- elif run_mode == "b":
- self.run_as_cnc_controller()
- else:
- raise Exception("Error: Unknown type of run mode.")
- def run_as_cnc_controller(self):
- program_pubtools: ProgramPublicTools = ProgramPublicTools()
- serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools)
- port_list = serial_device_controller.get_port_list()
- port_info: str = ''
- index_num: int = 0
- for port_name in port_list:
- port_info = port_info + f"Index[{index_num}]={port_name} "
- index_num += 1
- print(port_info)
- index = input("[CNC Controller] index: ")
- cnc_controller: SerialComService = SerialComService(program_pubtools)
- cnc_controller.port_connect(port_list[int(index)])
- result: bool = serial_device_controller.services.cnc_attenuator.match_connection(cnc_controller)
- if result is False:
- raise Exception("[CNC Controller] Error: Cannot connect to cnc device.")
- else:
- while True:
- get_value_input = input("Input to set cnc value: (dB) ")
- try:
- set_value = float(get_value_input)
- except ValueError:
- print("Invalid input! CNC value is under float format. Please enter a float number.")
- else:
- serial_device_controller.services.cnc_attenuator.set(set_value)
- print("Set success!")
- input("Input any content to continue...")
- def run_as_plc_tester(self):
- program_pubtools: ProgramPublicTools = ProgramPublicTools()
- serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools)
- # serial_device_controller.auto_connect()
- port_list = serial_device_controller.get_port_list()
- port_info: str = ''
- index_num: int = 0
- for port_name in port_list:
- port_info = port_info + f"Index[{index_num}]={port_name} "
- index_num += 1
- print(port_info)
- index_1 = input("[Sender] index: ")
- index_2 = input("[Receiver] index: ")
- index_3 = input("[Receiver2] index: (Optional) ")
- com_sender: SerialComService = SerialComService(program_pubtools)
- com_sender.port_connect(port_list[int(index_1)])
- com_receiver: SerialComService = SerialComService(program_pubtools)
- com_receiver.port_connect(port_list[int(index_2)])
- com_receiver2: Optional[SerialComService] = None
- if index_3:
- print(f"[Serial Controller] Multi Receiver Mode is enabled. Receiver2: {port_list[int(index_3)]}.")
- com_receiver2 = SerialComService(program_pubtools)
- com_receiver2.port_connect(port_list[int(index_3)])
- serial_device_controller.services.plc_sender.match_connection(com_sender)
- serial_device_controller.services.plc_receiver.match_connection(com_receiver)
- serial_device_controller.services.plc_receiver2.match_connection(com_receiver2)
- while True:
- print("Please config interval seconds for every message.")
- get_sec = input("[One Message] Seconds: float = ")
- try:
- float_sec = float(get_sec)
- break
- except ValueError:
- print("Invalid input! Seconds config is under float format. Please enter a float number.")
- while True:
- print("Please config interval seconds for every ten messages.")
- get_sec_ten = input("[Ten Messages] Seconds: float = ")
- try:
- float_sec_ten = float(get_sec_ten)
- break
- except ValueError:
- print("Invalid input! Seconds config is under float format. Please enter a float number.")
- serial_device_controller.services.plc_sender.sleep_sec_every_step = float_sec
- serial_device_controller.services.plc_sender.sleep_sec_every_ten_step = float_sec_ten
- serial_device_controller.services.plc_receiver.keep_receive()
- if com_receiver2:
- serial_device_controller.services.plc_receiver2.keep_receive()
- self.start_test_service(serial_device_controller.services.plc_sender,
- serial_device_controller.services.plc_receiver,
- serial_device_controller.services.plc_receiver2)
- else:
- self.start_test_service(serial_device_controller.services.plc_sender,
- serial_device_controller.services.plc_receiver, None)
- while True:
- get_input: str = input()
- if get_input:
- if get_input == "b":
- self.set_circle_hook_to_msg_once_mode()
- if get_input == "a":
- if self.__nullable_circle_completed_hook is None:
- self.set_circle_hook_to_using_msg_queue()
- else:
- self.reset_circle_hook_to_none()
- def task_add_reset_error_rate(self):
- self.__task_reset_error_rate_percent = True
- def __do_reset_error_rate(self):
- if self.__sender and self.__receiver:
- self.__sender.clear()
- self.__receiver.clear()
- if self.__receiver2:
- self.__receiver2.clear()
- self.__speed_timer = time.time()
- def error_rate_push(self, error_rate_percent: float, receiver_id: int = 1):
- if receiver_id == 2:
- self.error_rate_percent2 = error_rate_percent
- else:
- self.error_rate_percent = error_rate_percent
- def start_test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
- receiver2: Optional[PLCDeviceReceiver] = None):
- self.__pause_tester_signal = False
- if self.__is_tester_running is False:
- self.__start_tester_threading(sender, receiver, receiver2)
- self.__is_tester_running = True
- def pause_test_service(self):
- self.__pause_tester_signal = True
- def reset_frame_std_and_restart(self, frame_std_new: bytes):
- if self.task_lock_reset_frame_std_and_restart is False:
- self.task_lock_reset_frame_std_and_restart = True
- self.__frame_std_new = frame_std_new
- self.have_test_service_task_in_circle = True
- self.have_task_reset_frame_std = True
- def __do_reset_frame_std_and_restart(self):
- if self.__is_resetting_frame_std_and_restarting is False:
- self.__is_resetting_frame_std_and_restarting = True
- self.pause_test_service()
- self.__sender.pause_and_reset_frame_std_sender(self.__frame_std_new)
- self.__receiver.pause_and_reset_frame_std_receiver(self.__frame_std_new)
- if self.__receiver2 is not None:
- self.__receiver2.pause_and_reset_frame_std_receiver(self.__frame_std_new)
- self.task_add_reset_error_rate()
- self.start_test_service(self.__sender, self.__receiver, self.__receiver2)
- self.__is_resetting_frame_std_and_restarting = False
- def __test_service_task_in_circle(self):
- if self.have_task_reset_frame_std is True:
- self.__do_reset_frame_std_and_restart()
- self.have_task_reset_frame_std = False
- self.task_lock_reset_frame_std_and_restart = False
- def __start_tester_threading(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
- receiver2: Optional[PLCDeviceReceiver] = None):
- self.__threading = threading.Thread(target=self.test_service, args=(sender, receiver, receiver2)).start()
- def __check_task_reset_error_rate(self):
- if self.__task_reset_error_rate_percent is True:
- self.__do_reset_error_rate()
- self.__task_reset_error_rate_percent = False
- def update_error_rate(self, receiver_number: int = None):
- if receiver_number == 2:
- receiver = self.__receiver2
- else:
- receiver = self.__receiver
- self.__check_task_reset_error_rate()
- if (self.__sender is not None) and (receiver is not None):
- count_all = self.__sender.send_pkg_count
- get_error = receiver.error_count
- pkg_lose_bytes = (self.__sender.send_pkg_number - receiver.receive_pkg) * self.__sender.send_pkg_length
- count_error_all = pkg_lose_bytes + get_error
- if count_all != 0:
- error_rate_percent: float = 100 * count_error_all / count_all
- else:
- error_rate_percent: float = 0.0
- self.error_rate_push(error_rate_percent, receiver_number)
- self.__update_speed(count_all, pkg_lose_bytes, receiver_number)
- return count_all, pkg_lose_bytes, count_error_all
- else:
- return 0, 0, 0
- def __update_speed(self, count_all_bytes, pkg_lose_bytes, receiver_number: int = None):
- get_time = time.time() - self.__speed_timer
- if get_time != 0:
- speed_kbps = (count_all_bytes - pkg_lose_bytes) * 8 / (1000 * get_time)
- else:
- speed_kbps = 0
- if receiver_number == 2:
- self.speed_kbps2 = speed_kbps
- else:
- self.speed_kbps = speed_kbps
- def test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
- receiver2: Optional[PLCDeviceReceiver] = None):
- self.__sender = sender
- self.__receiver = receiver
- self.__receiver2 = receiver2
- self.__sender.sleep_sec_every_step = self.config_sleep_sec_every_step
- self.__sender.sleep_sec_every_ten_step = self.config_sleep_sec_every_ten_step
- self.__receiver.auto_pause_when_receiving = True
- if self.__receiver2:
- self.__receiver2.auto_pause_when_receiving = True
- self.__receiver.keep_receive()
- if self.__receiver2:
- self.__receiver2.keep_receive()
- self.__sender.send_once()
- self.__rev_timer_sec = time.time()
- self.__rev_timeout_sec = 2
- self.__speed_timer = time.time()
- for _ in range(0, 3):
- self.__test_circle(if_print=False)
- self.task_add_reset_error_rate()
- self.test_service_init_complete_flag = True
- while True:
- if self.have_test_service_task_in_circle is True:
- self.__test_service_task_in_circle()
- self.have_test_service_task_in_circle = False
- if self.__pause_tester_signal is False:
- if self.__pause_receive_under_hook_mode is False:
- self.__test_circle(if_print=True)
- else:
- print("[Hook] Program is running under hook mode.")
- self.__sender.send_once()
- if self.__nullable_circle_completed_hook is not None:
- print("[Hook] Do hook func...")
- self.__nullable_circle_completed_hook()
- else:
- print("[Hook] Warning: Hook func is empty.")
- time.sleep(0.1)
- else:
- time.sleep(0.1)
- @staticmethod
- def __tester_output(message: str, if_print: bool):
- if if_print is True:
- print(message)
- def __test_circle(self, if_print: bool = True):
- rev_flag_1 = self.__receiver.pull_up_signal_when_receiving
- if self.__receiver2 is None:
- rev_flag_2 = False
- else:
- rev_flag_2 = self.__receiver2.pull_up_signal_when_receiving
- if (self.__receiver2 is None) and (rev_flag_1 is True):
- count_all, pkg_lose_bytes, count_error = self.update_error_rate(1)
- self.__tester_output("====================================================", if_print)
- self.__tester_output(
- f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print)
- self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print)
- self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}",
- if_print)
- self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print)
- if self.__sender.send_pkg_number % 5 == 0:
- self.__tester_output(f"[Sender] Package Length: {len(self.__sender.frame_std)}", if_print)
- self.__tester_output(f"[Receiver] Package Length: {len(self.__receiver.frame_std)}", if_print)
- self.__receiver.pull_up_signal_when_receiving = False
- time.sleep(self.__sender.sleep_sec_every_step)
- if self.__sender.send_pkg_number % 10 == 0:
- time.sleep(self.__sender.sleep_sec_every_ten_step)
- self.__receiver.keep_receive()
- self.__sender.send_once()
- self.__rev_timer_sec = time.time()
- elif (self.__receiver2 is not None) and (rev_flag_1 is True) and (rev_flag_2 is True):
- count_all, pkg_lose_bytes, count_error = self.update_error_rate(1)
- count_all2, pkg_lose_bytes2, count_error2 = self.update_error_rate(2)
- self.__tester_output("====================================================", if_print)
- self.__tester_output(f"Receiver [1]:", if_print)
- self.__tester_output(
- f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print)
- self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print)
- self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}",
- if_print)
- self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print)
- self.__tester_output(f"Receiver [2]:", if_print)
- self.__tester_output(
- f"all: {count_all2 * 8} bit, lose: {pkg_lose_bytes2 * 8} bit, error: {count_error2 * 8} bit", if_print)
- self.__tester_output(f"error_rate: {self.error_rate_percent2} %", if_print)
- self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver2.receive_pkg}",
- if_print)
- self.__tester_output(f"speed: {self.speed_kbps2} kbps", if_print)
- self.__receiver.pull_up_signal_when_receiving = False
- time.sleep(self.__sender.sleep_sec_every_step)
- if self.__sender.send_pkg_number % 10 == 0:
- time.sleep(self.__sender.sleep_sec_every_ten_step)
- self.__receiver.keep_receive()
- self.__receiver2.keep_receive()
- self.__sender.send_once()
- self.__rev_timer_sec = time.time()
- elif time.time() - self.__rev_timer_sec >= self.__rev_timeout_sec:
- self.__tester_output("====================================================", if_print)
- self.__tester_output("[Serial Controller] Warning: Receiver timeout. A package lost.", if_print)
- self.update_error_rate(1)
- self.__tester_output(f"[Serial Controller] Warning: error_rate: {self.error_rate_percent} %", if_print)
- if self.__receiver2:
- self.update_error_rate(2)
- self.__tester_output(
- f"[Serial Controller] Warning: error_rate: {self.error_rate_percent2} % (Receiver2)", if_print)
- self.__receiver.pull_up_signal_when_receiving = False
- self.__receiver.keep_receive()
- if self.__receiver2:
- self.__receiver2.keep_receive()
- self.__sender.send_once()
- self.__rev_timer_sec = time.time()
- else:
- time.sleep(0.1)
- if __name__ == "__main__":
- main_tester: TestService = TestService()
- main_tester.run_as_main()
|