serial_com_service.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from typing import Optional, List, Union
  2. import serial
  3. import serial.tools.list_ports
  4. from program_public_tools import ProgramPublicTools
  5. class SerialComService(object):
  6. def __init__(self, pubtools: ProgramPublicTools):
  7. self.package_head: bytes = bytes([111])
  8. self.package_end: bytes = bytes([222])
  9. self.pubtools: ProgramPublicTools = pubtools
  10. self.port_list: Optional[list] = None
  11. self.port_raw_data: Optional[list] = None
  12. self.port_available_exist: bool = False
  13. self.port_connect_flag: bool = False
  14. self.port_baud_rate: int = 115200
  15. self.connection: Optional[serial.Serial] = None
  16. self.service_port_valid_mark: bool = False
  17. self.connection_port_name: Optional[str] = None
  18. def send(self, send_package: Union[List[bytes], bytes]):
  19. self.connection.write(send_package)
  20. def port_scan(self) -> list:
  21. get_port_list = []
  22. self.port_raw_data = serial.tools.list_ports.comports()
  23. for i in range(len(self.port_raw_data)):
  24. get_port_list.append(self.port_raw_data[i][0])
  25. self.port_list = get_port_list
  26. if len(self.port_list) != 0:
  27. self.port_available_exist = True
  28. self.pubtools.debug_output("Connecting to the platform.")
  29. if self.port_available_exist is True:
  30. self.pubtools.debug_output("Available ports: ", self.port_list)
  31. else:
  32. self.pubtools.debug_output("Unable to find available ports.")
  33. return self.port_list
  34. def port_connect(self, port_name: str = "Default", port_baud_rate: int = 115200) -> bool:
  35. self.port_baud_rate = port_baud_rate
  36. if port_name == "Default":
  37. if self.port_list is None:
  38. self.port_list = self.port_scan()
  39. if self.port_available_exist is False:
  40. self.pubtools.debug_output("No port available. Connect failed.")
  41. return self.port_connect_callback(self.port_connect_flag, port_name)
  42. else:
  43. port_name = self.port_list[0]
  44. self.connection = serial.Serial(None, self.port_baud_rate)
  45. self.connection.rts = False
  46. self.connection.dtr = False
  47. self.connection.port = port_name
  48. self.connection.open()
  49. if self.connection.isOpen() is not True:
  50. self.pubtools.debug_output("Connect failed: Port is not open.")
  51. return self.port_connect_callback(self.port_connect_flag, port_name)
  52. else:
  53. self.port_connect_flag = True
  54. return self.port_connect_callback(self.port_connect_flag, port_name)
  55. def port_connect_callback(self, if_succeed: bool, port_name: str) -> bool:
  56. if if_succeed:
  57. self.service_port_valid_mark = True
  58. self.connection_port_name = port_name
  59. return True
  60. else:
  61. self.connection_port_name = None
  62. return False
  63. def port_disconnect(self) -> None:
  64. self.connection.close()
  65. self.connection = None
  66. self.port_connect_flag = False