server.py 30 KB


  1. import os
  2. import time
  3. from typing import Optional
  4. import cv2
  5. import numpy as np
  6. from flask import Flask, request, jsonify
  7. from flask_cors import CORS
  8. from program_core import ProgramCore
  9. from program_public_tools import ProgramPublicTools
  10. class Server:
  11. def __init__(self, server_ip: str = '0.0.0.0', server_port: Optional[int] = None):
  12. self.__pubtools: ProgramPublicTools = ProgramPublicTools()
  13. self.__server_ip: str = server_ip
  14. self.__server_port: Optional[int] = server_port
  15. self.__server_ip_init()
  16. self.__core: ProgramCore = ProgramCore(self.__pubtools)
  17. self.__core.start_core()
  18. self.__app = Flask(__name__)
  19. CORS(self.__app)
  20. self.__setup_routes()
  21. self.__send_server_completed_flag()
  22. @staticmethod
  23. def __send_server_completed_flag():
  24. print("[Server] Server program is ready......")
  25. def __server_ip_init(self):
  26. if self.__server_port is None:
  27. self.__server_port = self.__pubtools.find_available_server_port(number=1, address=self.__server_ip)[0]
  28. print(f"Server is running on port [{self.__server_port}].")
  29. print(f"Server is running on port [{self.__server_port}].")
  30. print(f"Server is running on port [{self.__server_port}].")
  31. def start(self):
  32. self.__app.run(host=self.__server_ip, port=self.__server_port, debug=False)
  33. self.__server_debug_output()
  34. def __server_debug_output(self):
  35. _, _, url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening()
  36. print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
  37. print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
  38. print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
  39. print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
  40. print(f"[Server Debug Output]: OSC Stream Server is running on <{url}>")
  41. @staticmethod
  42. def __instrument_get_not_connect_string(not_connect_list: list[str]) -> str:
  43. response_string: str = "not_connect_list:"
  44. for not_connect_item in not_connect_list:
  45. response_string = response_string + " " + not_connect_item
  46. return response_string
  47. def __setup_routes(self):
  48. @self.__app.route("/")
  49. def home():
  50. return "Server is running."
  51. @self.__app.route("/instrument/not_connect_list", methods=['GET'])
  52. def instrument_not_connect_list():
  53. not_connect_list: list[str] = self.__core.instrument_controller.services.check()
  54. return self.__instrument_get_not_connect_string(not_connect_list)
  55. @self.__app.route("/instrument/digital_multimeter/connection_status", methods=['GET'])
  56. def instrument_digital_multimeter_connection_status():
  57. connection_status: bool = (self.__core.instrument_controller.services.digital_multimeter is not None)
  58. return f"connection_status: {connection_status}"
  59. @self.__app.route("/instrument/digital_oscilloscope/connection_status", methods=['GET'])
  60. def instrument_digital_oscilloscope_connection_status():
  61. connection_status: bool = (self.__core.instrument_controller.services.digital_oscilloscope is not None)
  62. return f"connection_status: {connection_status}"
  63. @self.__app.route("/instrument/waveform_generator/connection_status", methods=['GET'])
  64. def instrument_waveform_generator_connection_status():
  65. connection_status: bool = (self.__core.instrument_controller.services.waveform_generator is not None)
  66. return f"connection_status: {connection_status}"
  67. @self.__app.route("/instrument/analog_electronic_load/connection_status", methods=['GET'])
  68. def instrument_analog_electronic_load_connection_status():
  69. connection_status: bool = (self.__core.instrument_controller.services.analog_electronic_load is not None)
  70. return f"connection_status: {connection_status}"
  71. @self.__app.route("/instrument/digital_multimeter/keep_listening", methods=['GET'])
  72. def instrument_digital_multimeter_keep_listening():
  73. if self.__core.instrument_controller.services.digital_multimeter is not None:
  74. self.__core.instrument_controller.services.digital_multimeter.keep_listening()
  75. return f"keep_listening"
  76. else:
  77. return "None"
  78. @self.__app.route("/instrument/digital_multimeter/get_range", methods=['GET'])
  79. def instrument_digital_multimeter_get_range():
  80. if self.__core.instrument_controller.services.digital_multimeter is not None:
  81. range_list: list[str] = self.__core.instrument_controller.services.digital_multimeter.get_range()
  82. response_string: str = "get_range:"
  83. for rang_type_item in range_list:
  84. response_string = response_string + " " + rang_type_item
  85. return response_string
  86. else:
  87. return "None"
  88. @self.__app.route("/instrument/digital_multimeter/set_range", methods=['GET'])
  89. def instrument_digital_multimeter_set_range():
  90. if self.__core.instrument_controller.services.digital_multimeter is not None:
  91. range_string = request.args.get('range')
  92. range_type = self.__core.instrument_controller.services.digital_multimeter.solve_range_string(
  93. range_string)
  94. self.__core.instrument_controller.services.digital_multimeter.set_range(range_type)
  95. return f"set_range: {range_string}"
  96. else:
  97. return "None"
  98. @self.__app.route("/instrument/digital_multimeter/get_value", methods=['GET'])
  99. def instrument_digital_multimeter_get_value():
  100. if self.__core.instrument_controller.services.digital_multimeter is not None:
  101. get_value = self.__core.instrument_controller.services.digital_multimeter.get_value_latest()
  102. return f"get_value: {get_value}"
  103. else:
  104. return "None"
  105. @self.__app.route("/instrument/digital_oscilloscope/keep_listening", methods=['GET'])
  106. def instrument_digital_oscilloscope_keep_listening():
  107. if self.__core.instrument_controller.services.digital_oscilloscope is not None:
  108. _, v_port, v_url = self.__core.instrument_controller.services.digital_oscilloscope.keep_listening()
  109. return f"keep_listening: {v_port} {v_url}"
  110. else:
  111. return "None"
  112. @self.__app.route("/instrument/digital_oscilloscope/export_image", methods=['GET'])
  113. def instrument_digital_oscilloscope_export_image():
  114. if self.__core.instrument_controller.services.digital_oscilloscope is not None:
  115. get_time: str = self.__core.instrument_controller.services.digital_oscilloscope.save_img()
  116. return get_time
  117. else:
  118. return "None"
  119. @self.__app.route("/instrument/waveform_generator/output_start", methods=['GET'])
  120. def instrument_waveform_generator_output_start():
  121. if self.__core.instrument_controller.services.waveform_generator is not None:
  122. self.__core.instrument_controller.services.waveform_generator.output_start()
  123. return f"output_start"
  124. else:
  125. return "None"
  126. @self.__app.route("/instrument/waveform_generator/output_restart", methods=['GET'])
  127. def instrument_waveform_generator_output_restart():
  128. if self.__core.instrument_controller.services.waveform_generator is not None:
  129. self.__core.instrument_controller.services.waveform_generator.output_restart()
  130. return f"output_restart"
  131. else:
  132. return "None"
  133. @self.__app.route("/instrument/waveform_generator/output_pause", methods=['GET'])
  134. def instrument_waveform_generator_output_pause():
  135. if self.__core.instrument_controller.services.waveform_generator is not None:
  136. self.__core.instrument_controller.services.waveform_generator.output_pause()
  137. return f"output_pause"
  138. else:
  139. return "None"
  140. @self.__app.route("/instrument/waveform_generator/config/apply_config", methods=['GET'])
  141. def instrument_waveform_generator_config_apply_config():
  142. if self.__core.instrument_controller.services.waveform_generator is not None:
  143. self.__core.instrument_controller.services.waveform_generator.apply_config(True)
  144. return "apply_config"
  145. else:
  146. return "None"
  147. @self.__app.route("/instrument/waveform_generator/config/set_enable", methods=['GET'])
  148. def instrument_waveform_generator_config_set_enable():
  149. channel_string: str = request.args.get('channel')
  150. enable_string: str = request.args.get('enable').lower()
  151. if enable_string == "true":
  152. enable: bool = True
  153. else:
  154. enable: bool = False
  155. if self.__core.instrument_controller.services.waveform_generator is not None:
  156. channel = self.__core.instrument_controller.services.waveform_generator.set_enable(channel_string,
  157. enable)
  158. return f"set_enable: {channel} {enable}"
  159. else:
  160. return f"set_enable: None None"
  161. @self.__app.route("/instrument/waveform_generator/config/get_type", methods=['GET'])
  162. def instrument_waveform_generator_config_get_type():
  163. if self.__core.instrument_controller.services.waveform_generator is not None:
  164. res: str = self.__core.instrument_controller.services.waveform_generator.config_channel_1.get_type_string()
  165. return f"get_type: " + res
  166. else:
  167. return f"get_type: " + "None"
  168. @self.__app.route("/instrument/waveform_generator/config/set_type", methods=['GET'])
  169. def instrument_waveform_generator_config_set_type():
  170. channel_string: str = request.args.get('channel')
  171. type_string: str = request.args.get('type')
  172. if self.__core.instrument_controller.services.waveform_generator is not None:
  173. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  174. channel_string)
  175. config.type = config.match_type_string(type_string)
  176. return f"set_type: {channel} {type_string}"
  177. else:
  178. return f"set_type: None None"
  179. @self.__app.route("/instrument/waveform_generator/config/set_freq", methods=['GET'])
  180. def instrument_waveform_generator_config_set_freq():
  181. channel_string: str = request.args.get('channel')
  182. freq: float = float(request.args.get('freq'))
  183. if self.__core.instrument_controller.services.waveform_generator is not None:
  184. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  185. channel_string)
  186. config.freq = freq
  187. return f"set_freq: {channel} {freq}"
  188. else:
  189. return f"set_freq: None None"
  190. @self.__app.route("/instrument/waveform_generator/config/get_freq_unit", methods=['GET'])
  191. def instrument_waveform_generator_config_get_freq_unit():
  192. if self.__core.instrument_controller.services.waveform_generator is not None:
  193. _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1")
  194. unit_type_string: str = config.get_freq_unit_type_string()
  195. return f"get_freq_unit: {unit_type_string}"
  196. else:
  197. return f"get_freq_unit: None"
  198. @self.__app.route("/instrument/waveform_generator/config/set_freq_unit", methods=['GET'])
  199. def instrument_waveform_generator_config_set_freq_unit():
  200. channel_string: str = request.args.get('channel')
  201. freq_unit_string: str = request.args.get('freq_unit')
  202. if self.__core.instrument_controller.services.waveform_generator is not None:
  203. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  204. channel_string)
  205. config.freq_unit = config.match_freq_unit_type_string(freq_unit_string)
  206. return f"set_freq_unit: {channel} {freq_unit_string}"
  207. else:
  208. return f"set_freq_unit: None None"
  209. @self.__app.route("/instrument/waveform_generator/config/set_high_level", methods=['GET'])
  210. def instrument_waveform_generator_config_set_high_level():
  211. channel_string: str = request.args.get('channel')
  212. high_level: float = float(request.args.get('high_level'))
  213. if self.__core.instrument_controller.services.waveform_generator is not None:
  214. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  215. channel_string)
  216. config.high_level = high_level
  217. return f"set_high_level: {channel} {high_level}"
  218. else:
  219. return f"set_high_level: None None"
  220. @self.__app.route("/instrument/waveform_generator/config/set_low_level", methods=['GET'])
  221. def instrument_waveform_generator_config_set_low_level():
  222. channel_string: str = request.args.get('channel')
  223. low_level: float = float(request.args.get('low_level'))
  224. if self.__core.instrument_controller.services.waveform_generator is not None:
  225. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  226. channel_string)
  227. config.low_level = low_level
  228. return f"set_low_level: {channel} {low_level}"
  229. else:
  230. return f"set_low_level: None None"
  231. @self.__app.route("/instrument/waveform_generator/config/get_level_unit", methods=['GET'])
  232. def instrument_waveform_generator_config_get_level_unit():
  233. if self.__core.instrument_controller.services.waveform_generator is not None:
  234. _, config = self.__core.instrument_controller.services.waveform_generator.get_config("1")
  235. unit_type_string: str = config.get_freq_unit_type_string()
  236. return f"get_freq_unit: {unit_type_string}"
  237. else:
  238. return f"get_freq_unit: None"
  239. @self.__app.route("/instrument/waveform_generator/config/set_high_level_unit", methods=['GET'])
  240. def instrument_waveform_generator_config_set_high_level_unit():
  241. channel_string: str = request.args.get('channel')
  242. high_level_unit_string: str = request.args.get('high_level_unit')
  243. if self.__core.instrument_controller.services.waveform_generator is not None:
  244. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  245. channel_string)
  246. config.high_level_unit = config.match_level_unit_type_string(high_level_unit_string)
  247. return f"set_high_level_unit: {channel} {high_level_unit_string}"
  248. else:
  249. return f"set_high_level_unit: None {high_level_unit_string}"
  250. @self.__app.route("/instrument/waveform_generator/config/set_low_level_unit", methods=['GET'])
  251. def instrument_waveform_generator_config_set_low_level_unit():
  252. channel_string: str = request.args.get('channel')
  253. low_level_unit_string: str = request.args.get('low_level_unit')
  254. if self.__core.instrument_controller.services.waveform_generator is not None:
  255. channel, config = self.__core.instrument_controller.services.waveform_generator.get_config(
  256. channel_string)
  257. config.low_level_unit = config.match_level_unit_type_string(low_level_unit_string)
  258. return f"set_low_level_unit: {channel} {low_level_unit_string}"
  259. else:
  260. return f"set_low_level_unit: None {low_level_unit_string}"
  261. @self.__app.route("/instrument/waveform_generator/config/apply_all_setting", methods=['GET'])
  262. def instrument_waveform_generator_config_apply_all_setting():
  263. if self.__core.instrument_controller.services.waveform_generator is not None:
  264. waveform_type: str = request.args.get('type')
  265. waveform_freq: float = float(request.args.get('freq'))
  266. waveform_freq_unit: str = request.args.get('freq_unit')
  267. waveform_voltage: float = float(request.args.get('voltage'))
  268. waveform_voltage_unit: str = request.args.get('voltage_unit')
  269. enable_ch1: bool = bool(request.args.get('ch1'))
  270. enable_ch2: bool = bool(request.args.get('ch2'))
  271. generator = self.__core.instrument_controller.services.waveform_generator
  272. _, config_ch1 = generator.get_config("1")
  273. _, config_ch2 = generator.get_config("2")
  274. for config_chx in [config_ch1, config_ch2]:
  275. config_chx.type = config_chx.match_type_string(waveform_type)
  276. config_chx.type_string = waveform_type
  277. config_chx.freq = waveform_freq
  278. config_chx.freq_unit = config_chx.match_freq_unit_type_string(waveform_freq_unit)
  279. config_chx.high_level = waveform_voltage
  280. config_chx.low_level = - waveform_voltage
  281. config_chx.high_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit)
  282. config_chx.low_level_unit = config_chx.match_level_unit_type_string(waveform_voltage_unit)
  283. generator.set_enable("1", enable_ch1)
  284. generator.set_enable("2", enable_ch2)
  285. generator.apply_config(immediate_start=False)
  286. generator.output_start()
  287. return "True"
  288. else:
  289. return "None"
  290. @self.__app.route("/instrument/analog_electronic_load/set_resistance", methods=['GET'])
  291. def instrument_analog_electronic_load_set_resistance():
  292. if self.__core.instrument_controller.services.analog_electronic_load is not None:
  293. resistance: str = request.args.get('set_resistance')
  294. self.__core.instrument_controller.services.analog_electronic_load.set_resistance(resistance)
  295. return "True"
  296. else:
  297. return "False"
  298. @self.__app.route("/serial_device/port_list", methods=['GET'])
  299. def serial_device_port_list():
  300. port_list: list[str] = self.__core.serial_controller.get_port_list()
  301. response_string: str = "port_list:"
  302. if len(port_list) == 0:
  303. response_string = response_string + " " + "None"
  304. else:
  305. for port_name in port_list:
  306. response_string = response_string + " " + port_name
  307. return response_string
  308. @self.__app.route("/serial_device/auto_connect", methods=['GET'])
  309. def serial_device_auto_connect():
  310. matched_port_list, not_matched_port_list, cnc_matched = self.__core.serial_controller.auto_connect()
  311. response_data = {
  312. 'method': 'auto_connect',
  313. 'cnc_matched': cnc_matched,
  314. 'port_list': {
  315. 'matched': matched_port_list,
  316. 'not_matched': not_matched_port_list
  317. }
  318. }
  319. return jsonify(response_data), 200
  320. @self.__app.route("/serial_device/cnc/config_connection", methods=['GET'])
  321. def serial_device_cnc_config_connection():
  322. port_name: str = request.args.get('port_name')
  323. com_connection = self.__core.serial_controller.try_build_connection(port_name, if_add_source_to_lib=True)
  324. if_success = self.__core.serial_controller.services.cnc_attenuator.match_connection(com_connection)
  325. return f"{if_success}"
  326. @self.__app.route("/serial_device/cnc/set", methods=['GET'])
  327. def serial_device_cnc_set():
  328. set_value: str = request.args.get('value')
  329. if_success = self.__core.serial_controller.services.cnc_attenuator.set(float(set_value))
  330. return f"{if_success}"
  331. @self.__app.route("/serial_device/plc/config_connection", methods=['GET'])
  332. def serial_device_plc_connect_sender():
  333. port_name: str = request.args.get('port_name')
  334. plc_com_role: str = request.args.get('plc_com_role').lower()
  335. if_success: bool = False
  336. if (port_name is not None) and (port_name not in ["None", "", " "]):
  337. com_connection = self.__core.serial_controller.try_build_connection(port_name,
  338. if_add_source_to_lib=True)
  339. if com_connection is not None:
  340. if plc_com_role == "sender":
  341. if_success = self.__core.serial_controller.services.plc_sender.match_connection(com_connection)
  342. elif plc_com_role == "receiver":
  343. if_success = self.__core.serial_controller.services.plc_receiver.match_connection(
  344. com_connection)
  345. elif plc_com_role == "receiver2":
  346. if_success = self.__core.serial_controller.services.plc_receiver2.match_connection(
  347. com_connection)
  348. if if_success is True:
  349. self.__core.serial_controller.services.enable_multi_receiver = True
  350. else:
  351. if_success = False
  352. else:
  353. if_success: bool = True
  354. # if_success = True
  355. # ports = self.__core.serial_controller.get_port_list()
  356. # self.__core.serial_controller.services.plc_sender.match_connection()
  357. get_sender_port_name: str = "None"
  358. if self.__core.serial_controller.services.plc_sender.com is not None:
  359. get_sender_port_name = self.__core.serial_controller.services.plc_sender.com.connection_port_name
  360. get_receiver_port_name: str = "None"
  361. if self.__core.serial_controller.services.plc_receiver.com is not None:
  362. get_receiver_port_name = self.__core.serial_controller.services.plc_receiver.com.connection_port_name
  363. self.__core.serial_controller.services.com_usr_config_success = if_success
  364. response_data = {
  365. 'method': 'config_connection',
  366. 'plc_com_role': plc_com_role,
  367. 'port_name': port_name,
  368. 'if_success': if_success,
  369. 'new_config': {
  370. 'sender_port': f"{get_sender_port_name}",
  371. 'receiver_port': f"{get_receiver_port_name}",
  372. },
  373. 'received_config': {
  374. 'role': f"{plc_com_role}",
  375. 'port': f"{port_name}",
  376. }
  377. }
  378. return jsonify(response_data), 200
  379. @self.__app.route("/serial_device/plc/check_connection", methods=['GET'])
  380. def serial_device_plc_check_connection():
  381. sender_port: Optional[str] = self.__core.serial_controller.services.plc_sender.com.connection_port_name
  382. receiver_port: Optional[str] = self.__core.serial_controller.services.plc_receiver.com.connection_port_name
  383. sender_connection_flag: bool = (sender_port is not None)
  384. receiver_connection_flag: bool = (receiver_port is not None)
  385. connection_flag: bool = (sender_connection_flag is True) and (receiver_connection_flag is True)
  386. response_data = {
  387. 'method': 'check_connection',
  388. 'connection_status': connection_flag,
  389. 'config': {
  390. 'sender_port': f"{self.__core.serial_controller.services.plc_sender.com.connection_port_name}",
  391. 'receiver_port': f"{self.__core.serial_controller.services.plc_receiver.com.connection_port_name}",
  392. }
  393. }
  394. return jsonify(response_data), 200
  395. @self.__app.route("/serial_device/plc/start", methods=['GET'])
  396. def serial_device_plc_start():
  397. if self.__core.serial_controller.services.com_usr_config_success is True:
  398. package_config: Optional[str] = request.args.get('package_config')
  399. sender = self.__core.serial_controller.services.plc_sender
  400. receiver = self.__core.serial_controller.services.plc_receiver
  401. sender_port, sender_url = sender.keep_send()
  402. receiver_port, receiver_url = receiver.keep_receive()
  403. receiver2 = None
  404. if self.__core.serial_controller.services.enable_multi_receiver is True:
  405. receiver2 = self.__core.serial_controller.services.plc_receiver2
  406. receiver2.keep_receive()
  407. tester = self.__core.serial_controller.get_tester()
  408. tester.start_test_service(sender, receiver, receiver2)
  409. if package_config == "单包1kbit":
  410. tester_frame_new: bytes = tester.send_package_1kbit
  411. elif package_config == "单包16kbit":
  412. tester_frame_new: bytes = tester.send_package_16kbit
  413. else:
  414. tester_frame_new: bytes = package_config.encode()
  415. if len(tester_frame_new) % 3 == 0:
  416. tester_frame_new += b'12\n'
  417. elif len(tester_frame_new) % 3 == 1: # b'1'
  418. tester_frame_new += b'1\n'
  419. elif len(tester_frame_new) % 3 == 2: # b'12'
  420. tester_frame_new += b'\n'
  421. else:
  422. tester_frame_new = b'12\n'
  423. tester.reset_frame_std_and_restart(tester_frame_new)
  424. else:
  425. sender_port, sender_url, receiver_port, receiver_url = 0, 0, 0, 0
  426. response_data = {
  427. 'method': 'start',
  428. 'sender': {
  429. 'port': f"{sender_port}",
  430. 'url': sender_url,
  431. },
  432. 'receiver': {
  433. 'port': f"{receiver_port}",
  434. 'url': receiver_url,
  435. }
  436. }
  437. print(f"=== @self.__app.route('/serial_device/plc/start', methods=['GET']) ===")
  438. return jsonify(response_data), 200
  439. @self.__app.route("/serial_device/plc/pause", methods=['GET'])
  440. def serial_device_plc_pause():
  441. # self.__core.serial_controller.services.plc_sender.pause()
  442. # self.__core.serial_controller.services.plc_receiver.pause()
  443. tester = self.__core.serial_controller.get_tester()
  444. tester.pause_test_service()
  445. print(f"=== @self.__app.route('/serial_device/plc/pause', methods=['GET']) ===")
  446. return "pause"
  447. @self.__app.route("/serial_device/plc/clear", methods=['GET'])
  448. def serial_device_plc_clear():
  449. self.__core.serial_controller.get_tester().task_add_reset_error_rate()
  450. print(f"=== @self.__app.route('/serial_device/plc/clear', methods=['GET']) ===")
  451. return "clear"
  452. @self.__app.route("/serial_device/plc/realtime_performance", methods=['GET'])
  453. def serial_device_plc_realtime_performance():
  454. tester = self.__core.serial_controller.get_tester()
  455. error_rate_percent: float = tester.error_rate_percent
  456. error_rate_percent2: float = tester.error_rate_percent2
  457. print(f"=== @self.__app.route('/serial_device/plc/realtime_performance', methods=['GET']) ===")
  458. return f"realtime_performance: error_rate_percent={error_rate_percent} error_rate_percent2={error_rate_percent2}"
  459. @self.__app.route("/serial_device/plc/realtime_performance_speed", methods=['GET'])
  460. def serial_device_plc_realtime_performance_speed():
  461. tester = self.__core.serial_controller.get_tester()
  462. speed_kbps: float = tester.speed_kbps
  463. speed_kbps2: float = tester.speed_kbps2
  464. print(f"=== @self.__app.route('/serial_device/plc/realtime_performance_speed', methods=['GET']) ===")
  465. return f"realtime_performance_speed: speed_kbps={speed_kbps} speed_kbps2={speed_kbps2}"
  466. @self.__app.route("/serial_device/plc/motor_mode", methods=['GET'])
  467. def serial_device_plc_motor_mode():
  468. get_type: Optional[str] = request.args.get('type')
  469. mode_number: Optional[str] = request.args.get('mode')
  470. if get_type is not None and mode_number is not None:
  471. tester = self.__core.serial_controller.get_tester()
  472. use_msg = None
  473. if get_type == "motor":
  474. use_msg_list = tester.motor_msg_list
  475. if mode_number in ["1", "2", "3", "4"]:
  476. try:
  477. index = int(mode_number) - 1
  478. use_msg = use_msg_list[index]
  479. except ValueError:
  480. return "failed"
  481. except IndexError:
  482. return "failed"
  483. elif get_type == "relay":
  484. use_msg_list = tester.relay_msg_list
  485. if mode_number in ["1", "2"]:
  486. try:
  487. index = int(mode_number) - 1
  488. use_msg = use_msg_list[index]
  489. except ValueError:
  490. return "failed"
  491. except IndexError:
  492. return "failed"
  493. else:
  494. return "failed"
  495. if use_msg is not None:
  496. tester.set_circle_hook_to_msg_once_mode(use_msg)
  497. return "success"
  498. return "failed"
  499. @self.__app.route("/files/documents/start", methods=['GET'])
  500. def files_documents_start():
  501. host, port = self.__core.local_file_service.start()
  502. return f"documents: host={host} port={port}"
  503. if __name__ == "__main__":
  504. server: Server = Server()
  505. server.start()