powerline_adapter.py 12 KB


  1. import os
  2. import queue
  3. import subprocess
  4. import threading
  5. import time
  6. from typing import Union
  7. from flask import Flask, request, jsonify
  8. from flask_cors import CORS
  9. from flask_socketio import SocketIO
  10. from program_public_tools import ProgramPublicTools
  11. class EthernetIPv4Searcher:
  12. @staticmethod
  13. def search_ipv4(ethernet_title_mark: str = "以太网"):
  14. try:
  15. ipconfig_output = os.popen('ipconfig').read()
  16. adapters = []
  17. section_list = ipconfig_output.split('\n\n')
  18. index = 0
  19. max_index = len(section_list) - 1
  20. while True:
  21. section = section_list[index]
  22. if ethernet_title_mark in section:
  23. for line in section_list[index + 1].split('\n'):
  24. if 'IPv4 Address' in line or 'IP Address' in line:
  25. ip_address = line.split(':')[-1].strip()
  26. adapters.append(ip_address)
  27. index += 1
  28. if index > max_index:
  29. break
  30. return adapters
  31. except Exception as e:
  32. return {"Error": str(e)}
  33. class IperfClientConfig(object):
  34. ip: str = ""
  35. port: int = 50500
  36. ports: list[int] = [50500, 50555]
  37. class IperfServerConfig(object):
  38. ip: str = ""
  39. port: int = 50500
  40. ports: list[int] = [50500, 50555]
  41. class ProgramConnectMethod(object):
  42. general = 0
  43. win_terminal = 1
  44. lin_terminal = 2
  45. class PowerLineAdapterConfig(object):
  46. client: IperfClientConfig = IperfClientConfig()
  47. server: IperfServerConfig = IperfServerConfig()
  48. connect_method: ProgramConnectMethod = ProgramConnectMethod.win_terminal
  49. report_interval_sec: int = 1
  50. run_time_sec: int = 60
  51. enable_auto_get_ip: bool = True
  52. enable_auto_find_available_server_ip: bool = True
  53. enable_realtime_output_iperf_analyze: bool = True
  54. class PowerlineAdapterPublicTools(object):
  55. def __init__(self, pubtools: ProgramPublicTools):
  56. self.pubtools: ProgramPublicTools = pubtools
  57. def push_result(self, hook, *args):
  58. self.pubtools.debug_output(*args)
  59. if hook is not None:
  60. hook(''.join([str(arg) for arg in args]))
  61. class PowerlineAdapter(object):
  62. def __init__(self, pubtools: ProgramPublicTools):
  63. self.__pubtools: ProgramPublicTools = pubtools
  64. self.config: PowerLineAdapterConfig = PowerLineAdapterConfig()
  65. self.__iperf_runner: Union[IperfRunnerGeneral, IperfRunnerWinTerminal, IperfRunnerLinTerminal, None] = None
  66. self.__tools: PowerlineAdapterPublicTools = PowerlineAdapterPublicTools(self.__pubtools)
  67. self.__analyze_result: str = ""
  68. self.__hook = None
  69. def init(self):
  70. if self.config.connect_method == ProgramConnectMethod.general:
  71. self.__iperf_runner: IperfRunnerGeneral = IperfRunnerGeneral(self.__tools, self.config)
  72. elif self.config.connect_method == ProgramConnectMethod.win_terminal:
  73. self.__iperf_runner: IperfRunnerWinTerminal = IperfRunnerWinTerminal(self.__tools, self.config)
  74. elif self.config.connect_method == ProgramConnectMethod.lin_terminal:
  75. self.__iperf_runner: IperfRunnerLinTerminal = IperfRunnerLinTerminal(self.__tools, self.config)
  76. else:
  77. raise Exception("Error in PowerlineAdapter Config: Unknown connection type in config.connect_method.")
  78. def run(self, terminal_hook=None):
  79. threading.Thread(target=self.__run_main, args=(terminal_hook,)).start()
  80. def __run_main(self, terminal_hook=None):
  81. self.__hook = terminal_hook
  82. self.__iperf_runner.add_hook(self.__hook)
  83. if self.config.enable_auto_get_ip is True:
  84. get_ip_list = EthernetIPv4Searcher().search_ipv4()
  85. self.config.server.ip = get_ip_list[0]
  86. self.config.client.ip = get_ip_list[1]
  87. if self.config.enable_auto_find_available_server_ip is True:
  88. server_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.server.ip)[0]
  89. client_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.client.ip)[0]
  90. self.config.server.port = server_port
  91. self.config.client.port = client_port
  92. self.__analyze_result = self.__iperf_runner.run()
  93. self.__tools.push_result(self.__hook, self.__analyze_result)
  94. class IperfRunnerWinTerminal(object):
  95. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  96. self.__config: PowerLineAdapterConfig = config
  97. self.__tools: PowerlineAdapterPublicTools = tools
  98. self.__server_port_error_retry_history: list[int] = []
  99. self.__client_port_error_retry_history: list[int] = []
  100. self.__client_thread_permitted_to_run: bool = False
  101. self.analyze_result: str = ""
  102. self.__hook = None
  103. def add_hook(self, hook):
  104. self.__hook = hook
  105. def run(self):
  106. server_thread = threading.Thread(target=self.__server_thread)
  107. client_thread = threading.Thread(target=self.__client_thread)
  108. server_thread.start()
  109. client_thread.start()
  110. server_thread.join()
  111. client_thread.join()
  112. return self.analyze_result
  113. def __server_thread(self):
  114. self.__run_server()
  115. def __client_thread(self):
  116. while True:
  117. if self.__client_thread_permitted_to_run is True:
  118. time.sleep(5)
  119. break
  120. else:
  121. continue
  122. self.__run_client()
  123. def __run_server(self):
  124. config_interval = self.__config.report_interval_sec
  125. config_port = self.__config.server.port
  126. config_ip = self.__config.server.ip
  127. cmd = f"iperf3 -s -B {config_ip} -i {config_interval} -p {config_port}"
  128. self.__win_cmd_run(cmd, self.__server_terminal_output_process)
  129. def __server_port_error_retry(self):
  130. if self.__config.enable_auto_find_available_server_ip is True:
  131. self.__server_port_error_retry_history.append(self.__config.server.port)
  132. self.__client_port_error_retry_history.append(self.__config.client.port)
  133. server_port = self.__tools.pubtools.find_available_server_port(
  134. number=1, address=self.__config.server.ip, not_list=self.__server_port_error_retry_history)[0]
  135. client_port = self.__tools.pubtools.find_available_server_port(
  136. number=1, address=self.__config.client.ip, not_list=self.__client_port_error_retry_history)[0]
  137. self.__config.server.port = server_port
  138. self.__config.client.port = client_port
  139. self.__run_server()
  140. else:
  141. raise Exception("Error: Auto-find-available-server-ip is disabled, and the specified ip is not available.")
  142. def __server_terminal_output_process(self, message: str) -> None:
  143. if message is not None:
  144. if "Address already in use" in message:
  145. self.__server_port_error_retry()
  146. else:
  147. self.__client_thread_permitted_to_run = True
  148. else:
  149. self.__client_thread_permitted_to_run = True
  150. def __analyze_result_clear(self):
  151. self.analyze_result: str = ""
  152. def __run_client(self):
  153. self.__analyze_result_clear()
  154. config_server_ip = self.__config.server.ip
  155. config_interval = self.__config.report_interval_sec
  156. config_time = self.__config.run_time_sec
  157. config_port = self.__config.server.port
  158. config_ip = self.__config.client.ip
  159. cmd = f"iperf3 -c {config_server_ip} -B {config_ip} -i {config_interval} -t {config_time} -p {config_port}"
  160. if self.__config.enable_realtime_output_iperf_analyze is True:
  161. self.__pty_cmd_run(cmd, self.__client_terminal_output_process)
  162. else:
  163. self.__win_cmd_run(cmd, self.__client_terminal_output_process)
  164. def __client_terminal_output_process(self, message: str) -> None:
  165. if message is not None:
  166. if self.__hook is not None:
  167. self.__hook(message + "\n")
  168. self.analyze_result = self.analyze_result + message + "\n"
  169. def __pty_cmd_run(self, cmd, terminal_output_process_method):
  170. from winpty import PtyProcess
  171. proc = PtyProcess.spawn(cmd)
  172. while proc.isalive():
  173. line = proc.readline()
  174. if line == "":
  175. time.sleep(0.1)
  176. continue
  177. output = self.__pty_output_filter(f"{line.strip()}")
  178. self.__tools.push_result(self.__hook, output)
  179. terminal_output_process_method(output)
  180. @staticmethod
  181. def __pty_output_filter(message: str) -> str:
  182. message = message.replace("\\", "\\\\")
  183. if message.count("\\\\") >= 2:
  184. position = message.find("] ") - 4
  185. message = message[position:]
  186. return message
  187. @staticmethod
  188. def __win_cmd_run(cmd, terminal_output_process_method):
  189. process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  190. output_queue = queue.Queue()
  191. def read_output(get_process, get_output_queue):
  192. for line in iter(get_process.stdout.readline, b''):
  193. get_output_queue.put(line.decode())
  194. output_thread = threading.Thread(target=read_output, args=(process, output_queue))
  195. output_thread.start()
  196. while output_thread.is_alive():
  197. try:
  198. while True:
  199. terminal_output_process_method(output_queue.get_nowait())
  200. except queue.Empty:
  201. terminal_output_process_method(None)
  202. time.sleep(0.1)
  203. while not output_queue.empty():
  204. terminal_output_process_method(output_queue.get())
  205. process.wait()
  206. output_thread.join()
  207. class IperfRunnerLinTerminal(object):
  208. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  209. self.__config: PowerLineAdapterConfig = config
  210. self.__tools: PowerlineAdapterPublicTools = tools
  211. def run(self):
  212. pass
  213. class IperfRunnerGeneral(object):
  214. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  215. self.__config: PowerLineAdapterConfig = config
  216. self.__tools: PowerlineAdapterPublicTools = tools
  217. def run(self):
  218. pass
  219. class StringServer:
  220. def __init__(self, host: str, port: int, start_method):
  221. self.__host = host
  222. self.__port = port
  223. self.app = Flask(__name__)
  224. CORS(self.app)
  225. self.socketio = SocketIO(self.app, cors_allowed_origins="*")
  226. self.__start_method = start_method
  227. self.__is_running: bool = False
  228. self.__configure_routes()
  229. self.__configure_socket_events()
  230. def __configure_routes(self):
  231. @self.app.route('/initialize', methods=['POST'])
  232. def initialize():
  233. message = request.json.get('message')
  234. if message == 'start':
  235. if self.__is_running is False:
  236. self.__start_method(self.push_string)
  237. self.__is_running = True
  238. return jsonify({"status": "success"})
  239. def push_string(self, data_string):
  240. """Push a string to all connected clients."""
  241. self.socketio.emit('new_string', {'data': data_string}, namespace='/test', broadcast=True)
  242. def __configure_socket_events(self):
  243. @self.socketio.on('connect', namespace='/test')
  244. def test_connect():
  245. print('Client connected')
  246. @self.socketio.on('disconnect', namespace='/test')
  247. def test_disconnect():
  248. print('Client disconnected')
  249. def run(self):
  250. self.socketio.run(self.app, host=self.__host, port=self.__port, debug=True)
  251. def run_as_main(enable_web_server: bool):
  252. if enable_web_server is False:
  253. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  254. use_port = program_pubtools.find_available_server_port(number=1, start_port=8900)[0]
  255. powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
  256. powerline_adapter.init()
  257. server: StringServer = StringServer("0.0.0.0", use_port, powerline_adapter.run)
  258. print(f"http://localhost:{use_port}/initialize")
  259. server.run()
  260. else:
  261. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  262. powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
  263. powerline_adapter.init()
  264. powerline_adapter.run()
  265. if __name__ == "__main__":
  266. using_web_server = False
  267. run_as_main(using_web_server)