from typing import Optional, List, Union import serial import serial.tools.list_ports from program_public_tools import ProgramPublicTools class SerialComService(object): def __init__(self, pubtools: ProgramPublicTools): self.package_head: bytes = bytes([111]) self.package_end: bytes = bytes([222]) self.pubtools: ProgramPublicTools = pubtools self.port_list: Optional[list] = None self.port_raw_data: Optional[list] = None self.port_available_exist: bool = False self.port_connect_flag: bool = False self.port_baud_rate: int = 115200 self.connection: Optional[serial.Serial] = None self.service_port_valid_mark: bool = False self.connection_port_name: Optional[str] = None def send(self, send_package: Union[List[bytes], bytes]): self.connection.write(send_package) def port_scan(self) -> list: get_port_list = [] self.port_raw_data = serial.tools.list_ports.comports() for i in range(len(self.port_raw_data)): get_port_list.append(self.port_raw_data[i][0]) self.port_list = get_port_list if len(self.port_list) != 0: self.port_available_exist = True self.pubtools.debug_output("Connecting to the platform.") if self.port_available_exist is True: self.pubtools.debug_output("Available ports: ", self.port_list) else: self.pubtools.debug_output("Unable to find available ports.") return self.port_list def port_connect(self, port_name: str = "Default", port_baud_rate: int = 115200) -> bool: self.port_baud_rate = port_baud_rate if port_name == "Default": if self.port_list is None: self.port_list = self.port_scan() if self.port_available_exist is False: self.pubtools.debug_output("No port available. Connect failed.") return self.port_connect_callback(self.port_connect_flag, port_name) else: port_name = self.port_list[0] self.connection = serial.Serial(None, self.port_baud_rate) self.connection.rts = False self.connection.dtr = False self.connection.port = port_name self.connection.open() if self.connection.isOpen() is not True: self.pubtools.debug_output("Connect failed: Port is not open.") return self.port_connect_callback(self.port_connect_flag, port_name) else: self.port_connect_flag = True return self.port_connect_callback(self.port_connect_flag, port_name) def port_connect_callback(self, if_succeed: bool, port_name: str) -> bool: if if_succeed: self.service_port_valid_mark = True self.connection_port_name = port_name return True else: self.connection_port_name = None return False def port_disconnect(self) -> None: self.connection.close() self.connection = None self.port_connect_flag = False