123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- from typing import Optional, List, Union
- import serial
- import serial.tools.list_ports
- from program_public_tools import ProgramPublicTools
- class SerialComService(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.package_head: bytes = bytes([111])
- self.package_end: bytes = bytes([222])
- self.pubtools: ProgramPublicTools = pubtools
- self.port_list: Optional[list] = None
- self.port_raw_data: Optional[list] = None
- self.port_available_exist: bool = False
- self.port_connect_flag: bool = False
- self.port_baud_rate: int = 115200
- self.connection: Optional[serial.Serial] = None
- self.service_port_valid_mark: bool = False
- self.connection_port_name: Optional[str] = None
- def send(self, send_package: Union[List[bytes], bytes]):
- self.connection.write(send_package)
- def port_scan(self) -> list:
- get_port_list = []
- self.port_raw_data = serial.tools.list_ports.comports()
- for i in range(len(self.port_raw_data)):
- get_port_list.append(self.port_raw_data[i][0])
- self.port_list = get_port_list
- if len(self.port_list) != 0:
- self.port_available_exist = True
- self.pubtools.debug_output("Connecting to the platform.")
- if self.port_available_exist is True:
- self.pubtools.debug_output("Available ports: ", self.port_list)
- else:
- self.pubtools.debug_output("Unable to find available ports.")
- return self.port_list
- def port_connect(self, port_name: str = "Default", port_baud_rate: int = 115200) -> bool:
- self.port_baud_rate = port_baud_rate
- if port_name == "Default":
- if self.port_list is None:
- self.port_list = self.port_scan()
- if self.port_available_exist is False:
- self.pubtools.debug_output("No port available. Connect failed.")
- return self.port_connect_callback(self.port_connect_flag, port_name)
- else:
- port_name = self.port_list[0]
- self.connection = serial.Serial(None, self.port_baud_rate)
- self.connection.rts = False
- self.connection.dtr = False
- self.connection.port = port_name
- self.connection.open()
- if self.connection.isOpen() is not True:
- self.pubtools.debug_output("Connect failed: Port is not open.")
- return self.port_connect_callback(self.port_connect_flag, port_name)
- else:
- self.port_connect_flag = True
- return self.port_connect_callback(self.port_connect_flag, port_name)
- def port_connect_callback(self, if_succeed: bool, port_name: str) -> bool:
- if if_succeed:
- self.service_port_valid_mark = True
- self.connection_port_name = port_name
- return True
- else:
- self.connection_port_name = None
- return False
- def port_disconnect(self) -> None:
- self.connection.close()
- self.connection = None
- self.port_connect_flag = False
|