serial_device_controller.py 31 KB


  1. import threading
  2. import time
  3. from typing import Optional
  4. import Levenshtein
  5. import numpy as np
  6. from flask import jsonify, Flask, request, Response
  7. from flask_cors import CORS
  8. from program_public_tools import ProgramPublicTools
  9. from serial_com_service import SerialComService
  10. class SerialDevice(object):
  11. def __init__(self, pubtools: ProgramPublicTools):
  12. self.__pubtools: ProgramPublicTools = pubtools
  13. self.cnc_attenuator: CNCAttenuator = CNCAttenuator(self.__pubtools)
  14. self.plc_sender: PLCDeviceSender = PLCDeviceSender(self.__pubtools, "PLC-Sender")
  15. self.plc_receiver: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver")
  16. self.plc_receiver2: PLCDeviceReceiver = PLCDeviceReceiver(self.__pubtools, "PLC-Receiver2")
  17. self.enable_multi_receiver: bool = False
  18. self.__enable_match_cnc: bool = False
  19. self.com_usr_config_success = False
  20. def match_connection(self, com_service: SerialComService, enable_auto_match_plc_device: bool = False) \
  21. -> tuple[bool, bool]:
  22. matched: bool = False
  23. cnc_matched: bool = False
  24. if (self.cnc_attenuator.com is None) and (self.__enable_match_cnc is True):
  25. matched = self.cnc_attenuator.match_connection(com_service)
  26. if matched:
  27. cnc_matched = True
  28. if enable_auto_match_plc_device is True:
  29. if matched is False:
  30. if self.plc_sender.com is None:
  31. matched = self.plc_sender.match_connection(com_service)
  32. elif self.plc_receiver.com is None:
  33. matched = self.plc_receiver.match_connection(com_service)
  34. else:
  35. matched = False
  36. return matched, cnc_matched
  37. def check(self):
  38. if self.cnc_attenuator.com is not None:
  39. serial_device_list = ["CNC-Attenuator"]
  40. not_connect_list = []
  41. else:
  42. serial_device_list = []
  43. not_connect_list = ["CNC-Attenuator"]
  44. return serial_device_list, not_connect_list
  45. class SerialDeviceController(object):
  46. def __init__(self, pubtools: ProgramPublicTools):
  47. self.__pubtools: ProgramPublicTools = pubtools
  48. self.__port_list = SerialComService(self.__pubtools).port_scan()
  49. self.services: SerialDevice = SerialDevice(self.__pubtools)
  50. self.__baud_rate: int = 115200
  51. self.__connection_lib: dict = {}
  52. self.__tester: Optional[TestService] = None
  53. def get_tester(self):
  54. if self.__tester is None:
  55. self.__tester = TestService()
  56. return self.__tester
  57. def auto_connect(self) -> tuple[list[str], list[str], bool]:
  58. return self.auto_match(self.get_port_list())
  59. def get_port_list(self) -> list:
  60. return self.__port_list
  61. def auto_match(self, device_port_list: list[str]) -> tuple[list[str], list[str], bool]:
  62. not_matched_port_list: list[str] = []
  63. matched_port_list: list[str] = []
  64. cnc_match_result: bool = False
  65. for port_name in device_port_list:
  66. com_service = self.try_build_connection(port_name)
  67. match_flag: bool = False
  68. if com_service is not None:
  69. if com_service.connection is not None:
  70. match_result, cnc_match_result = self.services.match_connection(com_service)
  71. if match_result is True:
  72. match_flag = True
  73. else:
  74. com_service.port_disconnect()
  75. if match_flag is True:
  76. matched_port_list.append(port_name)
  77. else:
  78. not_matched_port_list.append(port_name)
  79. return matched_port_list, not_matched_port_list, cnc_match_result
  80. def try_build_connection(self, port_name: str, if_add_source_to_lib: bool = False) -> Optional[SerialComService]:
  81. if port_name in self.__connection_lib:
  82. return self.__connection_lib[port_name]
  83. else:
  84. com_service: SerialComService = SerialComService(self.__pubtools)
  85. connection_result = False
  86. try:
  87. connection_result: bool = com_service.port_connect(port_name, self.__baud_rate)
  88. except Exception as e:
  89. self.__pubtools.debug_output(f"Error when attempting to connect port [{port_name}]: {e}")
  90. if connection_result is True:
  91. if if_add_source_to_lib is True:
  92. self.__connection_lib[port_name] = com_service
  93. return com_service
  94. else:
  95. return None
  96. class PLCDevice(object):
  97. def __init__(self, pubtools: ProgramPublicTools, name: str):
  98. self.pubtools: ProgramPublicTools = pubtools
  99. self.com: Optional[SerialComService] = None
  100. self.name: str = name
  101. self.frame_std: bytes = 200 * b'#123456789#X' + b'123456789#\n'
  102. def match_connection(self, com: SerialComService) -> bool:
  103. self.com = com
  104. return True
  105. class PLCDeviceSender(PLCDevice):
  106. def __init__(self, pubtools: ProgramPublicTools, name: str):
  107. super().__init__(pubtools, name)
  108. self.send_pkg_number: int = 0
  109. self.send_pkg_count: int = 0
  110. self.send_pkg_length: int = len(self.frame_std)
  111. self.__s_server: FloatServer = FloatServer("Sender")
  112. self.__is_running: bool = False
  113. self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8393)[0]
  114. self.server_url = f"sender: http://localhost:{self.server_port}{self.__s_server.name}"
  115. self.__pause: bool = False
  116. self.sleep_sec_every_step: float = 0
  117. self.sleep_sec_every_ten_step: float = 0.01
  118. self.__ten_step_counter: int = 0
  119. self.pull_up_signal_when_sending: bool = False
  120. self.__is_resetting: bool = False
  121. def clear(self):
  122. self.send_pkg_number = 0
  123. self.send_pkg_count = 0
  124. self.__ten_step_counter = 0
  125. def keep_send(self):
  126. self.__pause = False
  127. if self.__is_running is not True:
  128. self.__start_server()
  129. self.__is_running = True
  130. return self.__return_server_info()
  131. def pause(self):
  132. self.__pause = True
  133. def pause_and_reset_frame_std_sender(self, frame_std_new):
  134. if self.__is_resetting is False:
  135. self.__is_resetting = True
  136. self.pause()
  137. self.frame_std = frame_std_new
  138. self.send_pkg_length = len(self.frame_std)
  139. self.__is_resetting = False
  140. @staticmethod
  141. def __start_server():
  142. return None
  143. def __start_server_origin(self):
  144. threading.Thread(target=self.__send_thread_main).start()
  145. threading.Thread(target=self.__sender_server_threading_main, args=(self.server_port,)).start()
  146. print(self.server_url)
  147. return self.__return_server_info()
  148. @staticmethod
  149. def __return_server_info():
  150. return None, None
  151. def __return_server_info_origin(self):
  152. return self.server_port, self.server_url
  153. def __sender_server_threading_main(self, port):
  154. self.__s_server.run("0.0.0.0", port)
  155. def __send_thread_main(self):
  156. if self.sleep_sec_every_step != 0:
  157. print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every message.")
  158. if self.sleep_sec_every_step != 0:
  159. print(f"[Tester] Using config: Waiting [{self.sleep_sec_every_step}] seconds every ten message.")
  160. while True:
  161. if self.__pause is False:
  162. time.sleep(self.sleep_sec_every_step)
  163. self.__ten_step_counter += 1
  164. if self.__ten_step_counter == 10:
  165. self.__ten_step_counter = 0
  166. time.sleep(self.sleep_sec_every_ten_step)
  167. self.__s_server.push(self.send_pkg_count)
  168. else:
  169. time.sleep(0.1)
  170. def send_once(self):
  171. self.com.connection.write(self.frame_std)
  172. self.pull_up_signal_when_sending = True
  173. self.send_pkg_number += 1
  174. self.send_pkg_count = self.send_pkg_number * self.send_pkg_length
  175. class PLCDeviceReceiver(PLCDevice):
  176. def __init__(self, pubtools: ProgramPublicTools, name: str):
  177. super().__init__(pubtools, name)
  178. self.error_count: int = 0
  179. self.receive_pkg: int = 0
  180. self.receive_bytes: int = 0
  181. self.receive_bytes_this_circle: int = 0
  182. self.__r_server: FloatServer = FloatServer("Receiver")
  183. self.server_port = self.pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8353)[0]
  184. self.server_url = f"receive: http://localhost:{self.server_port}{self.__r_server.name}"
  185. self.__is_running: bool = False
  186. self.__pause: bool = False
  187. self.pull_up_signal_when_receiving: bool = False
  188. self.auto_pause_when_receiving = False
  189. self.__update_frame_std: bool = False
  190. self.__is_resetting: bool = False
  191. def clear(self):
  192. self.error_count = 0
  193. self.receive_pkg = 0
  194. self.receive_bytes = 0
  195. self.receive_bytes_this_circle: int = 0
  196. def keep_receive(self):
  197. self.__pause = False
  198. if self.__is_running is not True:
  199. self.__start_server()
  200. self.__is_running = True
  201. return self.__return_info()
  202. def pause(self):
  203. self.__pause = True
  204. def __start_server(self):
  205. threading.Thread(target=self.__receive_thread_main).start()
  206. threading.Thread(target=self.__receiver_server_threading_main, args=(self.server_port,)).start()
  207. print(self.server_url)
  208. return self.__return_info()
  209. def __return_info(self):
  210. return self.server_port, self.server_url
  211. def __receiver_server_threading_main(self, port):
  212. self.__r_server.run("0.0.0.0", port)
  213. def pause_and_reset_frame_std_receiver(self, frame_std_new):
  214. if self.__is_resetting is False:
  215. self.__is_resetting = True
  216. self.frame_std = frame_std_new
  217. self.pause()
  218. self.__update_frame_std = True
  219. def __receive_thread_main(self):
  220. frame_std: bytes = self.frame_std
  221. frame_std_len: int = len(frame_std)
  222. while True:
  223. if self.__update_frame_std is True:
  224. frame_std: bytes = self.frame_std
  225. frame_std_len: int = len(frame_std)
  226. self.__update_frame_std = False
  227. self.__is_resetting = False
  228. if self.__pause is False:
  229. self.receive_bytes_this_circle = 0
  230. line = self.com.connection.readline()
  231. if line:
  232. self.receive_pkg += 1
  233. self.receive_bytes = self.receive_bytes + len(line)
  234. self.receive_bytes_this_circle = len(line)
  235. self.pull_up_signal_when_receiving = True
  236. get_error_bytes = self.calculate_ber_optimized(frame_std, line)
  237. if get_error_bytes > frame_std_len:
  238. self.error_count += frame_std_len
  239. else:
  240. self.error_count += get_error_bytes
  241. self.__r_server.push(self.error_count)
  242. if self.auto_pause_when_receiving is True:
  243. self.pause()
  244. else:
  245. time.sleep(0.1)
  246. @staticmethod
  247. def calculate_ber_optimized(pkg_send, pkg_receive):
  248. assert isinstance(pkg_send, bytes) and isinstance(pkg_receive, bytes), "Input should be bytes"
  249. assert len(pkg_send) > 0 and len(pkg_receive) > 0, "Input bytes should not be empty"
  250. min_length = min(len(pkg_send), len(pkg_receive))
  251. bit_errors = 0
  252. for send_byte, receive_byte in zip(pkg_send[:min_length], pkg_receive[:min_length]):
  253. if send_byte != receive_byte:
  254. send_bits = np.unpackbits(np.array([send_byte], dtype=np.uint8))
  255. receive_bits = np.unpackbits(np.array([receive_byte], dtype=np.uint8))
  256. bit_errors += np.sum(send_bits != receive_bits)
  257. return np.int_(bit_errors / 8)
  258. class CNCAttenuator(object):
  259. def __init__(self, pubtools: ProgramPublicTools):
  260. self.__pubtools: ProgramPublicTools = pubtools
  261. self.com: Optional[SerialComService] = None
  262. self.name: str = "CNC-Attenuator"
  263. def match_connection(self, com: SerialComService) -> bool:
  264. for _ in range(0, 2):
  265. com.connection.write(b'att-000.00\r\n')
  266. com.connection.timeout = 1
  267. data = com.connection.readline().decode('utf-8')
  268. if data.lower() == "attOK".lower():
  269. self.com = com
  270. self.__pubtools.debug_output("Successfully connect to CNC-Attenuator.")
  271. return True
  272. return False
  273. def set(self, num: float) -> bool:
  274. if self.com is None:
  275. return False
  276. else:
  277. try:
  278. formatted_num_str = "{:06.2f}".format(num)
  279. formatted_byte_str = b'att-' + formatted_num_str.encode('utf-8') + b'\r\n'
  280. self.com.connection.write(formatted_byte_str)
  281. except Exception as e:
  282. return False
  283. else:
  284. return True
  285. class FloatServer:
  286. def __init__(self, name: Optional[str] = None):
  287. self.name: Optional[str] = name
  288. self.__app = Flask(__name__)
  289. CORS(self.__app)
  290. self.__value = 0.0
  291. self.__instrument_on = False # 新增的仪器状态标志
  292. self.__setup_routes()
  293. def __setup_routes(self):
  294. if self.name is None:
  295. self.name = f"/float_default_{time.time()}"
  296. else:
  297. self.name = f"/{self.name}"
  298. @self.__app.route(self.name, methods=['GET'])
  299. def get_value():
  300. return jsonify({'value': self.__value})
  301. # 新增的端点来获取仪器的状态
  302. @self.__app.route(f"{self.name}/instrument_status", methods=['GET'])
  303. def get_instrument_status():
  304. return jsonify({'status': self.__instrument_on})
  305. # 新增的端点来更改仪器的状态
  306. @self.__app.route(f"{self.name}/set_instrument_status", methods=['POST'])
  307. def set_instrument_status():
  308. status = request.json.get('status', None)
  309. if status is not None:
  310. self.__instrument_on = status
  311. return Response("Instrument status updated", status=200)
  312. else:
  313. return Response("Invalid request format", status=400)
  314. def run(self, host, port):
  315. self.__app.run(host=host, port=port)
  316. def push(self, new_value):
  317. self.__value = new_value
  318. class TestService(object):
  319. def __init__(self):
  320. self.config_sleep_sec_every_step: float = 0
  321. self.config_sleep_sec_every_ten_step: float = 0
  322. self.error_rate_percent: float = 0
  323. self.error_rate_percent2: float = 0
  324. self.__is_tester_running: bool = False
  325. self.__threading = None
  326. self.__sender = None
  327. self.__receiver = None
  328. self.__receiver2 = None
  329. self.__task_reset_error_rate_percent: bool = False
  330. self.__speed_timer: float = 0
  331. self.speed_kbps: float = 0
  332. self.speed_kbps2: float = 0
  333. self.__rev_timer_sec: float = time.time()
  334. self.__rev_timeout_sec: float = 2
  335. self.__is_resetting_frame_std_and_restarting: bool = False
  336. self.send_package_1kbit: bytes = 25 * b'12345' + b'\n'
  337. self.send_package_16kbit: bytes = 400 * b'12345' + b'\n'
  338. self.test_service_init_complete_flag: bool = False
  339. self.have_test_service_task_in_circle: bool = False
  340. self.have_task_reset_frame_std: bool = False
  341. self.task_lock_reset_frame_std_and_restart: bool = False
  342. self.__frame_std_new: bytes = self.send_package_16kbit
  343. def run_as_main(self):
  344. run_mode = ""
  345. while True:
  346. print("Run Mode: A: PLC-Tester B: CNC-Controller")
  347. input_mode = input("Choose a Mode: ").lower()
  348. if input_mode == "a":
  349. run_mode = input_mode
  350. break
  351. elif input_mode == "b":
  352. run_mode = input_mode
  353. break
  354. else:
  355. print("Invalid mode input! Try input again.")
  356. if run_mode == "a":
  357. self.run_as_plc_tester()
  358. elif run_mode == "b":
  359. self.run_as_cnc_controller()
  360. else:
  361. raise Exception("Error: Unknown type of run mode.")
  362. def run_as_cnc_controller(self):
  363. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  364. serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools)
  365. port_list = serial_device_controller.get_port_list()
  366. port_info: str = ''
  367. index_num: int = 0
  368. for port_name in port_list:
  369. port_info = port_info + f"Index[{index_num}]={port_name} "
  370. index_num += 1
  371. print(port_info)
  372. index = input("[CNC Controller] index: ")
  373. cnc_controller: SerialComService = SerialComService(program_pubtools)
  374. cnc_controller.port_connect(port_list[int(index)])
  375. result: bool = serial_device_controller.services.cnc_attenuator.match_connection(cnc_controller)
  376. if result is False:
  377. raise Exception("[CNC Controller] Error: Cannot connect to cnc device.")
  378. else:
  379. while True:
  380. get_value_input = input("Input to set cnc value: (dB) ")
  381. try:
  382. set_value = float(get_value_input)
  383. except ValueError:
  384. print("Invalid input! CNC value is under float format. Please enter a float number.")
  385. else:
  386. serial_device_controller.services.cnc_attenuator.set(set_value)
  387. print("Set success!")
  388. input("Input any content to continue...")
  389. def run_as_plc_tester(self):
  390. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  391. serial_device_controller: SerialDeviceController = SerialDeviceController(program_pubtools)
  392. # serial_device_controller.auto_connect()
  393. port_list = serial_device_controller.get_port_list()
  394. port_info: str = ''
  395. index_num: int = 0
  396. for port_name in port_list:
  397. port_info = port_info + f"Index[{index_num}]={port_name} "
  398. index_num += 1
  399. print(port_info)
  400. index_1 = input("[Sender] index: ")
  401. index_2 = input("[Receiver] index: ")
  402. index_3 = input("[Receiver2] index: (Optional) ")
  403. com_sender: SerialComService = SerialComService(program_pubtools)
  404. com_sender.port_connect(port_list[int(index_1)])
  405. com_receiver: SerialComService = SerialComService(program_pubtools)
  406. com_receiver.port_connect(port_list[int(index_2)])
  407. com_receiver2: Optional[SerialComService] = None
  408. if index_3:
  409. print(f"[Serial Controller] Multi Receiver Mode is enabled. Receiver2: {port_list[int(index_3)]}.")
  410. com_receiver2 = SerialComService(program_pubtools)
  411. com_receiver2.port_connect(port_list[int(index_3)])
  412. serial_device_controller.services.plc_sender.match_connection(com_sender)
  413. serial_device_controller.services.plc_receiver.match_connection(com_receiver)
  414. serial_device_controller.services.plc_receiver2.match_connection(com_receiver2)
  415. while True:
  416. print("Please config interval seconds for every message.")
  417. get_sec = input("[One Message] Seconds: float = ")
  418. try:
  419. float_sec = float(get_sec)
  420. break
  421. except ValueError:
  422. print("Invalid input! Seconds config is under float format. Please enter a float number.")
  423. while True:
  424. print("Please config interval seconds for every ten messages.")
  425. get_sec_ten = input("[Ten Messages] Seconds: float = ")
  426. try:
  427. float_sec_ten = float(get_sec_ten)
  428. break
  429. except ValueError:
  430. print("Invalid input! Seconds config is under float format. Please enter a float number.")
  431. serial_device_controller.services.plc_sender.sleep_sec_every_step = float_sec
  432. serial_device_controller.services.plc_sender.sleep_sec_every_ten_step = float_sec_ten
  433. serial_device_controller.services.plc_receiver.keep_receive()
  434. if com_receiver2:
  435. serial_device_controller.services.plc_receiver2.keep_receive()
  436. self.start_test_service(serial_device_controller.services.plc_sender,
  437. serial_device_controller.services.plc_receiver,
  438. serial_device_controller.services.plc_receiver2)
  439. else:
  440. self.start_test_service(serial_device_controller.services.plc_sender,
  441. serial_device_controller.services.plc_receiver, None)
  442. while True:
  443. if input():
  444. self.task_add_reset_error_rate()
  445. def task_add_reset_error_rate(self):
  446. self.__task_reset_error_rate_percent = True
  447. def __do_reset_error_rate(self):
  448. if self.__sender and self.__receiver:
  449. self.__sender.clear()
  450. self.__receiver.clear()
  451. if self.__receiver2:
  452. self.__receiver2.clear()
  453. self.__speed_timer = time.time()
  454. def error_rate_push(self, error_rate_percent: float, receiver_id: int = 1):
  455. if receiver_id == 2:
  456. self.error_rate_percent2 = error_rate_percent
  457. else:
  458. self.error_rate_percent = error_rate_percent
  459. def start_test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
  460. receiver2: Optional[PLCDeviceReceiver] = None):
  461. self.__pause_tester_signal = False
  462. if self.__is_tester_running is False:
  463. self.__start_tester_threading(sender, receiver, receiver2)
  464. self.__is_tester_running = True
  465. def pause_test_service(self):
  466. self.__pause_tester_signal = True
  467. def reset_frame_std_and_restart(self, frame_std_new: bytes):
  468. if self.task_lock_reset_frame_std_and_restart is False:
  469. self.task_lock_reset_frame_std_and_restart = True
  470. self.__frame_std_new = frame_std_new
  471. self.have_test_service_task_in_circle = True
  472. self.have_task_reset_frame_std = True
  473. def __do_reset_frame_std_and_restart(self):
  474. if self.__is_resetting_frame_std_and_restarting is False:
  475. self.__is_resetting_frame_std_and_restarting = True
  476. self.pause_test_service()
  477. self.__sender.pause_and_reset_frame_std_sender(self.__frame_std_new)
  478. self.__receiver.pause_and_reset_frame_std_receiver(self.__frame_std_new)
  479. if self.__receiver2 is not None:
  480. self.__receiver2.pause_and_reset_frame_std_receiver(self.__frame_std_new)
  481. self.task_add_reset_error_rate()
  482. self.start_test_service(self.__sender, self.__receiver, self.__receiver2)
  483. self.__is_resetting_frame_std_and_restarting = False
  484. def __test_service_task_in_circle(self):
  485. if self.have_task_reset_frame_std is True:
  486. self.__do_reset_frame_std_and_restart()
  487. self.have_task_reset_frame_std = False
  488. self.task_lock_reset_frame_std_and_restart = False
  489. def __start_tester_threading(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
  490. receiver2: Optional[PLCDeviceReceiver] = None):
  491. self.__threading = threading.Thread(target=self.test_service, args=(sender, receiver, receiver2)).start()
  492. def __check_task_reset_error_rate(self):
  493. if self.__task_reset_error_rate_percent is True:
  494. self.__do_reset_error_rate()
  495. self.__task_reset_error_rate_percent = False
  496. def update_error_rate(self, receiver_number: int = None):
  497. if receiver_number == 2:
  498. receiver = self.__receiver2
  499. else:
  500. receiver = self.__receiver
  501. self.__check_task_reset_error_rate()
  502. if (self.__sender is not None) and (receiver is not None):
  503. count_all = self.__sender.send_pkg_count
  504. get_error = receiver.error_count
  505. pkg_lose_bytes = (self.__sender.send_pkg_number - receiver.receive_pkg) * self.__sender.send_pkg_length
  506. count_error_all = pkg_lose_bytes + get_error
  507. if count_all != 0:
  508. error_rate_percent: float = 100 * count_error_all / count_all
  509. else:
  510. error_rate_percent: float = 0.0
  511. self.error_rate_push(error_rate_percent, receiver_number)
  512. self.__update_speed(count_all, pkg_lose_bytes, receiver_number)
  513. return count_all, pkg_lose_bytes, count_error_all
  514. else:
  515. return 0, 0, 0
  516. def __update_speed(self, count_all_bytes, pkg_lose_bytes, receiver_number: int = None):
  517. get_time = time.time() - self.__speed_timer
  518. if get_time != 0:
  519. speed_kbps = (count_all_bytes - pkg_lose_bytes) * 8 / (1000 * get_time)
  520. else:
  521. speed_kbps = 0
  522. if receiver_number == 2:
  523. self.speed_kbps2 = speed_kbps
  524. else:
  525. self.speed_kbps = speed_kbps
  526. def test_service(self, sender: PLCDeviceSender, receiver: PLCDeviceReceiver,
  527. receiver2: Optional[PLCDeviceReceiver] = None):
  528. self.__sender = sender
  529. self.__receiver = receiver
  530. self.__receiver2 = receiver2
  531. self.__sender.sleep_sec_every_step = self.config_sleep_sec_every_step
  532. self.__sender.sleep_sec_every_ten_step = self.config_sleep_sec_every_ten_step
  533. self.__receiver.auto_pause_when_receiving = True
  534. if self.__receiver2:
  535. self.__receiver2.auto_pause_when_receiving = True
  536. self.__receiver.keep_receive()
  537. if self.__receiver2:
  538. self.__receiver2.keep_receive()
  539. self.__sender.send_once()
  540. self.__rev_timer_sec = time.time()
  541. self.__rev_timeout_sec = 2
  542. self.__speed_timer = time.time()
  543. for _ in range(0, 3):
  544. self.__test_circle(if_print=False)
  545. self.task_add_reset_error_rate()
  546. self.test_service_init_complete_flag = True
  547. while True:
  548. if self.have_test_service_task_in_circle is True:
  549. self.__test_service_task_in_circle()
  550. self.have_test_service_task_in_circle = False
  551. if self.__pause_tester_signal is False:
  552. self.__test_circle(if_print=True)
  553. else:
  554. time.sleep(0.1)
  555. @staticmethod
  556. def __tester_output(message: str, if_print: bool):
  557. if if_print is True:
  558. print(message)
  559. def __test_circle(self, if_print: bool = True):
  560. rev_flag_1 = self.__receiver.pull_up_signal_when_receiving
  561. if self.__receiver2 is None:
  562. rev_flag_2 = False
  563. else:
  564. rev_flag_2 = self.__receiver2.pull_up_signal_when_receiving
  565. if (self.__receiver2 is None) and (rev_flag_1 is True):
  566. count_all, pkg_lose_bytes, count_error = self.update_error_rate(1)
  567. self.__tester_output("====================================================", if_print)
  568. self.__tester_output(
  569. f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print)
  570. self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print)
  571. self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}",
  572. if_print)
  573. self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print)
  574. if self.__sender.send_pkg_number % 5 == 0:
  575. self.__tester_output(f"[Sender] Package Length: {len(self.__sender.frame_std)}", if_print)
  576. self.__tester_output(f"[Receiver] Package Length: {len(self.__receiver.frame_std)}", if_print)
  577. self.__receiver.pull_up_signal_when_receiving = False
  578. time.sleep(self.__sender.sleep_sec_every_step)
  579. if self.__sender.send_pkg_number % 10 == 0:
  580. time.sleep(self.__sender.sleep_sec_every_ten_step)
  581. self.__receiver.keep_receive()
  582. self.__sender.send_once()
  583. self.__rev_timer_sec = time.time()
  584. elif (self.__receiver2 is not None) and (rev_flag_1 is True) and (rev_flag_2 is True):
  585. count_all, pkg_lose_bytes, count_error = self.update_error_rate(1)
  586. count_all2, pkg_lose_bytes2, count_error2 = self.update_error_rate(2)
  587. self.__tester_output("====================================================", if_print)
  588. self.__tester_output(f"Receiver [1]:", if_print)
  589. self.__tester_output(
  590. f"all: {count_all * 8} bit, lose: {pkg_lose_bytes * 8} bit, error: {count_error * 8} bit", if_print)
  591. self.__tester_output(f"error_rate: {self.error_rate_percent} %", if_print)
  592. self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver.receive_pkg}",
  593. if_print)
  594. self.__tester_output(f"speed: {self.speed_kbps} kbps", if_print)
  595. self.__tester_output(f"Receiver [2]:", if_print)
  596. self.__tester_output(
  597. f"all: {count_all2 * 8} bit, lose: {pkg_lose_bytes2 * 8} bit, error: {count_error2 * 8} bit", if_print)
  598. self.__tester_output(f"error_rate: {self.error_rate_percent2} %", if_print)
  599. self.__tester_output(f"DBG: 发包={self.__sender.send_pkg_number} 收包={self.__receiver2.receive_pkg}",
  600. if_print)
  601. self.__tester_output(f"speed: {self.speed_kbps2} kbps", if_print)
  602. self.__receiver.pull_up_signal_when_receiving = False
  603. time.sleep(self.__sender.sleep_sec_every_step)
  604. if self.__sender.send_pkg_number % 10 == 0:
  605. time.sleep(self.__sender.sleep_sec_every_ten_step)
  606. self.__receiver.keep_receive()
  607. self.__receiver2.keep_receive()
  608. self.__sender.send_once()
  609. self.__rev_timer_sec = time.time()
  610. elif time.time() - self.__rev_timer_sec >= self.__rev_timeout_sec:
  611. self.__tester_output("====================================================", if_print)
  612. self.__tester_output("[Serial Controller] Warning: Receiver timeout. A package lost.", if_print)
  613. self.update_error_rate(1)
  614. self.__tester_output(f"[Serial Controller] Warning: error_rate: {self.error_rate_percent} %", if_print)
  615. if self.__receiver2:
  616. self.update_error_rate(2)
  617. self.__tester_output(
  618. f"[Serial Controller] Warning: error_rate: {self.error_rate_percent2} % (Receiver2)", if_print)
  619. self.__receiver.pull_up_signal_when_receiving = False
  620. self.__receiver.keep_receive()
  621. if self.__receiver2:
  622. self.__receiver2.keep_receive()
  623. self.__sender.send_once()
  624. self.__rev_timer_sec = time.time()
  625. else:
  626. time.sleep(0.1)
  627. if __name__ == "__main__":
  628. main_tester: TestService = TestService()
  629. main_tester.run_as_main()