powerline_adapter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import os
  2. import queue
  3. import subprocess
  4. import threading
  5. import time
  6. from typing import Union
  7. from flask import Flask, request, jsonify
  8. from flask_cors import CORS
  9. from flask_socketio import SocketIO
  10. from program_public_tools import ProgramPublicTools
  11. class EthernetIPv4Searcher:
  12. @staticmethod
  13. def search_ipv4(ethernet_title_mark: str = "以太网"):
  14. try:
  15. ipconfig_output = os.popen('ipconfig').read()
  16. adapters = []
  17. section_list = ipconfig_output.split('\n\n')
  18. index = 0
  19. max_index = len(section_list) - 1
  20. while True:
  21. section = section_list[index]
  22. if ethernet_title_mark in section:
  23. for line in section_list[index + 1].split('\n'):
  24. if 'IPv4 Address' in line or 'IP Address' in line:
  25. ip_address = line.split(':')[-1].strip()
  26. adapters.append(ip_address)
  27. index += 1
  28. if index > max_index:
  29. break
  30. return adapters
  31. except Exception as e:
  32. return {"Error": str(e)}
  33. class IperfClientConfig(object):
  34. ip: str = ""
  35. port: int = 50500
  36. ports: list[int] = [50500, 50555]
  37. class IperfServerConfig(object):
  38. ip: str = ""
  39. port: int = 50500
  40. ports: list[int] = [50500, 50555]
  41. class ProgramConnectMethod(object):
  42. general = 0
  43. win_terminal = 1
  44. lin_terminal = 2
  45. class PowerLineAdapterConfig(object):
  46. client: IperfClientConfig = IperfClientConfig()
  47. server: IperfServerConfig = IperfServerConfig()
  48. connect_method: ProgramConnectMethod = ProgramConnectMethod.win_terminal
  49. report_interval_sec: int = 1
  50. run_time_sec: int = 60
  51. enable_auto_get_ip: bool = True
  52. enable_auto_find_available_server_ip: bool = True
  53. enable_realtime_output_iperf_analyze: bool = True
  54. class PowerlineAdapterPublicTools(object):
  55. def __init__(self, pubtools: ProgramPublicTools):
  56. self.pubtools: ProgramPublicTools = pubtools
  57. def push_result(self, hook, *args):
  58. self.pubtools.debug_output(*args)
  59. if hook is not None:
  60. hook(''.join([str(arg) for arg in args]))
  61. class PowerlineAdapter(object):
  62. def __init__(self, pubtools: ProgramPublicTools):
  63. self.__pubtools: ProgramPublicTools = pubtools
  64. self.config: PowerLineAdapterConfig = PowerLineAdapterConfig()
  65. self.__iperf_runner: Union[IperfRunnerGeneral, IperfRunnerWinTerminal, IperfRunnerLinTerminal, None] = None
  66. self.__tools: PowerlineAdapterPublicTools = PowerlineAdapterPublicTools(self.__pubtools)
  67. self.__analyze_result: str = ""
  68. self.__hook = None
  69. def init(self):
  70. if self.config.connect_method == ProgramConnectMethod.general:
  71. self.__iperf_runner: IperfRunnerGeneral = IperfRunnerGeneral(self.__tools, self.config)
  72. elif self.config.connect_method == ProgramConnectMethod.win_terminal:
  73. self.__iperf_runner: IperfRunnerWinTerminal = IperfRunnerWinTerminal(self.__tools, self.config)
  74. elif self.config.connect_method == ProgramConnectMethod.lin_terminal:
  75. self.__iperf_runner: IperfRunnerLinTerminal = IperfRunnerLinTerminal(self.__tools, self.config)
  76. else:
  77. raise Exception("Error in PowerlineAdapter Config: Unknown connection type in config.connect_method.")
  78. def run(self, terminal_hook=None):
  79. threading.Thread(target=self.__run_main, args=(terminal_hook,)).start()
  80. def __run_main(self, terminal_hook=None):
  81. self.__hook = terminal_hook
  82. self.__iperf_runner.add_hook(self.__hook)
  83. if self.config.enable_auto_get_ip is True:
  84. get_ip_list = EthernetIPv4Searcher().search_ipv4()
  85. self.config.server.ip = get_ip_list[0]
  86. self.config.client.ip = get_ip_list[1]
  87. if self.config.enable_auto_find_available_server_ip is True:
  88. server_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.server.ip)[0]
  89. client_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.client.ip)[0]
  90. self.config.server.port = server_port
  91. self.config.client.port = client_port
  92. self.__analyze_result = self.__iperf_runner.run()
  93. self.__tools.push_result(self.__hook, self.__analyze_result)
  94. class IperfRunnerWinTerminal(object):
  95. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  96. self.__config: PowerLineAdapterConfig = config
  97. self.__tools: PowerlineAdapterPublicTools = tools
  98. self.__server_port_error_retry_history: list[int] = []
  99. self.__client_port_error_retry_history: list[int] = []
  100. self.__client_thread_permitted_to_run: bool = False
  101. self.analyze_result: str = ""
  102. self.__hook = None
  103. def add_hook(self, hook):
  104. self.__hook = hook
  105. def run(self):
  106. server_thread = threading.Thread(target=self.__server_thread)
  107. client_thread = threading.Thread(target=self.__client_thread)
  108. server_thread.start()
  109. client_thread.start()
  110. server_thread.join()
  111. client_thread.join()
  112. return self.analyze_result
  113. def __server_thread(self):
  114. self.__run_server()
  115. def __client_thread(self):
  116. while True:
  117. if self.__client_thread_permitted_to_run is True:
  118. time.sleep(5)
  119. break
  120. else:
  121. continue
  122. self.__run_client()
  123. def __run_server(self):
  124. config_interval = self.__config.report_interval_sec
  125. config_port = self.__config.server.port
  126. config_ip = self.__config.server.ip
  127. cmd = f"iperf3 -s -B {config_ip} -i {config_interval} -p {config_port}"
  128. self.__win_cmd_run(cmd, self.__server_terminal_output_process)
  129. def __server_port_error_retry(self):
  130. if self.__config.enable_auto_find_available_server_ip is True:
  131. self.__server_port_error_retry_history.append(self.__config.server.port)
  132. self.__client_port_error_retry_history.append(self.__config.client.port)
  133. server_port = self.__tools.pubtools.find_available_server_port(
  134. number=1, address=self.__config.server.ip, not_list=self.__server_port_error_retry_history)[0]
  135. client_port = self.__tools.pubtools.find_available_server_port(
  136. number=1, address=self.__config.client.ip, not_list=self.__client_port_error_retry_history)[0]
  137. self.__config.server.port = server_port
  138. self.__config.client.port = client_port
  139. self.__run_server()
  140. else:
  141. raise Exception("Error: Auto-find-available-server-ip is disabled, and the specified ip is not available.")
  142. def __server_terminal_output_process(self, message: str) -> None:
  143. if message is not None:
  144. if "Address already in use" in message:
  145. self.__server_port_error_retry()
  146. else:
  147. self.__client_thread_permitted_to_run = True
  148. else:
  149. self.__client_thread_permitted_to_run = True
  150. def __analyze_result_clear(self):
  151. self.analyze_result: str = ""
  152. def __run_client(self):
  153. self.__analyze_result_clear()
  154. config_server_ip = self.__config.server.ip
  155. config_interval = self.__config.report_interval_sec
  156. config_time = self.__config.run_time_sec
  157. config_port = self.__config.server.port
  158. config_ip = self.__config.client.ip
  159. cmd = f"iperf3 -c {config_server_ip} -B {config_ip} -i {config_interval} -t {config_time} -p {config_port}"
  160. if self.__config.enable_realtime_output_iperf_analyze is True:
  161. self.__pty_cmd_run(cmd, self.__client_terminal_output_process)
  162. else:
  163. self.__win_cmd_run(cmd, self.__client_terminal_output_process)
  164. def __client_terminal_output_process(self, message: str) -> None:
  165. if message is not None:
  166. if self.__hook is not None:
  167. self.__hook(message + "\n")
  168. self.analyze_result = self.analyze_result + message + "\n"
  169. def __pty_cmd_run(self, cmd, terminal_output_process_method):
  170. from winpty import PtyProcess
  171. proc = PtyProcess.spawn(cmd)
  172. while proc.isalive():
  173. line = proc.readline()
  174. if line == "":
  175. time.sleep(0.1)
  176. continue
  177. output = self.__pty_output_filter(f"{line.strip()}")
  178. self.__tools.push_result(self.__hook, output)
  179. terminal_output_process_method(output)
  180. @staticmethod
  181. def __pty_output_filter(message: str) -> str:
  182. message = message.replace("\\", "\\\\")
  183. if message.count("\\\\") >= 2:
  184. position = message.find("] ") - 4
  185. message = message[position:]
  186. return message
  187. @staticmethod
  188. def __win_cmd_run(cmd, terminal_output_process_method):
  189. process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  190. output_queue = queue.Queue()
  191. def read_output(get_process, get_output_queue):
  192. for line in iter(get_process.stdout.readline, b''):
  193. get_output_queue.put(line.decode())
  194. output_thread = threading.Thread(target=read_output, args=(process, output_queue))
  195. output_thread.start()
  196. while output_thread.is_alive():
  197. try:
  198. while True:
  199. terminal_output_process_method(output_queue.get_nowait())
  200. except queue.Empty:
  201. terminal_output_process_method(None)
  202. time.sleep(0.1)
  203. while not output_queue.empty():
  204. terminal_output_process_method(output_queue.get())
  205. process.wait()
  206. output_thread.join()
  207. class IperfRunnerLinTerminal(object):
  208. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  209. self.__config: PowerLineAdapterConfig = config
  210. self.__tools: PowerlineAdapterPublicTools = tools
  211. def run(self):
  212. pass
  213. class IperfRunnerGeneral(object):
  214. def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
  215. self.__config: PowerLineAdapterConfig = config
  216. self.__tools: PowerlineAdapterPublicTools = tools
  217. def run(self):
  218. pass
  219. class StringServer:
  220. def __init__(self, host: str, port: int, start_method):
  221. self.__host = host
  222. self.__port = port
  223. self.app = Flask(__name__)
  224. CORS(self.app)
  225. self.socketio = SocketIO(self.app, cors_allowed_origins="*")
  226. self.__start_method = start_method
  227. self.__is_running: bool = False
  228. self.__configure_routes()
  229. self.__configure_socket_events()
  230. def __configure_routes(self):
  231. @self.app.route('/initialize', methods=['POST'])
  232. def initialize():
  233. message = request.json.get('message')
  234. if message == 'start':
  235. if self.__is_running is False:
  236. self.__start_method(self.push_string)
  237. self.__is_running = True
  238. return jsonify({"status": "success"})
  239. def push_string(self, data_string):
  240. """Push a string to all connected clients."""
  241. self.socketio.emit('new_string', {'data': data_string}, namespace='/test', broadcast=True)
  242. def __configure_socket_events(self):
  243. @self.socketio.on('connect', namespace='/test')
  244. def test_connect():
  245. print('Client connected')
  246. @self.socketio.on('disconnect', namespace='/test')
  247. def test_disconnect():
  248. print('Client disconnected')
  249. def run(self):
  250. self.socketio.run(self.app, host=self.__host, port=self.__port, debug=True)
  251. def run_as_main(enable_web_server: bool):
  252. if enable_web_server is False:
  253. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  254. use_port = program_pubtools.find_available_server_port(number=1, start_port=8900)[0]
  255. powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
  256. powerline_adapter.init()
  257. server: StringServer = StringServer("0.0.0.0", use_port, powerline_adapter.run)
  258. print(f"http://localhost:{use_port}/initialize")
  259. server.run()
  260. else:
  261. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  262. powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
  263. powerline_adapter.init()
  264. powerline_adapter.run()
  265. if __name__ == "__main__":
  266. using_web_server = False
  267. run_as_main(using_web_server)