import os import queue import subprocess import threading import time from typing import Union from flask import Flask, request, jsonify from flask_cors import CORS from flask_socketio import SocketIO from program_public_tools import ProgramPublicTools class EthernetIPv4Searcher: @staticmethod def search_ipv4(ethernet_title_mark: str = "以太网"): try: ipconfig_output = os.popen('ipconfig').read() adapters = [] section_list = ipconfig_output.split('\n\n') index = 0 max_index = len(section_list) - 1 while True: section = section_list[index] if ethernet_title_mark in section: for line in section_list[index + 1].split('\n'): if 'IPv4 Address' in line or 'IP Address' in line: ip_address = line.split(':')[-1].strip() adapters.append(ip_address) index += 1 if index > max_index: break return adapters except Exception as e: return {"Error": str(e)} class IperfClientConfig(object): ip: str = "" port: int = 50500 ports: list[int] = [50500, 50555] class IperfServerConfig(object): ip: str = "" port: int = 50500 ports: list[int] = [50500, 50555] class ProgramConnectMethod(object): general = 0 win_terminal = 1 lin_terminal = 2 class PowerLineAdapterConfig(object): client: IperfClientConfig = IperfClientConfig() server: IperfServerConfig = IperfServerConfig() connect_method: ProgramConnectMethod = ProgramConnectMethod.win_terminal report_interval_sec: int = 1 run_time_sec: int = 60 enable_auto_get_ip: bool = True enable_auto_find_available_server_ip: bool = True enable_realtime_output_iperf_analyze: bool = True class PowerlineAdapterPublicTools(object): def __init__(self, pubtools: ProgramPublicTools): self.pubtools: ProgramPublicTools = pubtools def push_result(self, hook, *args): self.pubtools.debug_output(*args) if hook is not None: hook(''.join([str(arg) for arg in args])) class PowerlineAdapter(object): def __init__(self, pubtools: ProgramPublicTools): self.__pubtools: ProgramPublicTools = pubtools self.config: PowerLineAdapterConfig = PowerLineAdapterConfig() self.__iperf_runner: Union[IperfRunnerGeneral, IperfRunnerWinTerminal, IperfRunnerLinTerminal, None] = None self.__tools: PowerlineAdapterPublicTools = PowerlineAdapterPublicTools(self.__pubtools) self.__analyze_result: str = "" self.__hook = None def init(self): if self.config.connect_method == ProgramConnectMethod.general: self.__iperf_runner: IperfRunnerGeneral = IperfRunnerGeneral(self.__tools, self.config) elif self.config.connect_method == ProgramConnectMethod.win_terminal: self.__iperf_runner: IperfRunnerWinTerminal = IperfRunnerWinTerminal(self.__tools, self.config) elif self.config.connect_method == ProgramConnectMethod.lin_terminal: self.__iperf_runner: IperfRunnerLinTerminal = IperfRunnerLinTerminal(self.__tools, self.config) else: raise Exception("Error in PowerlineAdapter Config: Unknown connection type in config.connect_method.") def run(self, terminal_hook=None): threading.Thread(target=self.__run_main, args=(terminal_hook,)).start() def __run_main(self, terminal_hook=None): self.__hook = terminal_hook self.__iperf_runner.add_hook(self.__hook) if self.config.enable_auto_get_ip is True: get_ip_list = EthernetIPv4Searcher().search_ipv4() self.config.server.ip = get_ip_list[0] self.config.client.ip = get_ip_list[1] if self.config.enable_auto_find_available_server_ip is True: server_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.server.ip)[0] client_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.client.ip)[0] self.config.server.port = server_port self.config.client.port = client_port self.__analyze_result = self.__iperf_runner.run() self.__tools.push_result(self.__hook, self.__analyze_result) class IperfRunnerWinTerminal(object): def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig): self.__config: PowerLineAdapterConfig = config self.__tools: PowerlineAdapterPublicTools = tools self.__server_port_error_retry_history: list[int] = [] self.__client_port_error_retry_history: list[int] = [] self.__client_thread_permitted_to_run: bool = False self.analyze_result: str = "" self.__hook = None def add_hook(self, hook): self.__hook = hook def run(self): server_thread = threading.Thread(target=self.__server_thread) client_thread = threading.Thread(target=self.__client_thread) server_thread.start() client_thread.start() server_thread.join() client_thread.join() return self.analyze_result def __server_thread(self): self.__run_server() def __client_thread(self): while True: if self.__client_thread_permitted_to_run is True: time.sleep(5) break else: continue self.__run_client() def __run_server(self): config_interval = self.__config.report_interval_sec config_port = self.__config.server.port config_ip = self.__config.server.ip cmd = f"iperf3 -s -B {config_ip} -i {config_interval} -p {config_port}" self.__win_cmd_run(cmd, self.__server_terminal_output_process) def __server_port_error_retry(self): if self.__config.enable_auto_find_available_server_ip is True: self.__server_port_error_retry_history.append(self.__config.server.port) self.__client_port_error_retry_history.append(self.__config.client.port) server_port = self.__tools.pubtools.find_available_server_port( number=1, address=self.__config.server.ip, not_list=self.__server_port_error_retry_history)[0] client_port = self.__tools.pubtools.find_available_server_port( number=1, address=self.__config.client.ip, not_list=self.__client_port_error_retry_history)[0] self.__config.server.port = server_port self.__config.client.port = client_port self.__run_server() else: raise Exception("Error: Auto-find-available-server-ip is disabled, and the specified ip is not available.") def __server_terminal_output_process(self, message: str) -> None: if message is not None: if "Address already in use" in message: self.__server_port_error_retry() else: self.__client_thread_permitted_to_run = True else: self.__client_thread_permitted_to_run = True def __analyze_result_clear(self): self.analyze_result: str = "" def __run_client(self): self.__analyze_result_clear() config_server_ip = self.__config.server.ip config_interval = self.__config.report_interval_sec config_time = self.__config.run_time_sec config_port = self.__config.server.port config_ip = self.__config.client.ip cmd = f"iperf3 -c {config_server_ip} -B {config_ip} -i {config_interval} -t {config_time} -p {config_port}" if self.__config.enable_realtime_output_iperf_analyze is True: self.__pty_cmd_run(cmd, self.__client_terminal_output_process) else: self.__win_cmd_run(cmd, self.__client_terminal_output_process) def __client_terminal_output_process(self, message: str) -> None: if message is not None: if self.__hook is not None: self.__hook(message + "\n") self.analyze_result = self.analyze_result + message + "\n" def __pty_cmd_run(self, cmd, terminal_output_process_method): from winpty import PtyProcess proc = PtyProcess.spawn(cmd) while proc.isalive(): line = proc.readline() if line == "": time.sleep(0.1) continue output = self.__pty_output_filter(f"{line.strip()}") self.__tools.push_result(self.__hook, output) terminal_output_process_method(output) @staticmethod def __pty_output_filter(message: str) -> str: message = message.replace("\\", "\\\\") if message.count("\\\\") >= 2: position = message.find("] ") - 4 message = message[position:] return message @staticmethod def __win_cmd_run(cmd, terminal_output_process_method): process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output_queue = queue.Queue() def read_output(get_process, get_output_queue): for line in iter(get_process.stdout.readline, b''): get_output_queue.put(line.decode()) output_thread = threading.Thread(target=read_output, args=(process, output_queue)) output_thread.start() while output_thread.is_alive(): try: while True: terminal_output_process_method(output_queue.get_nowait()) except queue.Empty: terminal_output_process_method(None) time.sleep(0.1) while not output_queue.empty(): terminal_output_process_method(output_queue.get()) process.wait() output_thread.join() class IperfRunnerLinTerminal(object): def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig): self.__config: PowerLineAdapterConfig = config self.__tools: PowerlineAdapterPublicTools = tools def run(self): pass class IperfRunnerGeneral(object): def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig): self.__config: PowerLineAdapterConfig = config self.__tools: PowerlineAdapterPublicTools = tools def run(self): pass class StringServer: def __init__(self, host: str, port: int, start_method): self.__host = host self.__port = port self.app = Flask(__name__) CORS(self.app) self.socketio = SocketIO(self.app, cors_allowed_origins="*") self.__start_method = start_method self.__is_running: bool = False self.__configure_routes() self.__configure_socket_events() def __configure_routes(self): @self.app.route('/initialize', methods=['POST']) def initialize(): message = request.json.get('message') if message == 'start': if self.__is_running is False: self.__start_method(self.push_string) self.__is_running = True return jsonify({"status": "success"}) def push_string(self, data_string): """Push a string to all connected clients.""" self.socketio.emit('new_string', {'data': data_string}, namespace='/test', broadcast=True) def __configure_socket_events(self): @self.socketio.on('connect', namespace='/test') def test_connect(): print('Client connected') @self.socketio.on('disconnect', namespace='/test') def test_disconnect(): print('Client disconnected') def run(self): self.socketio.run(self.app, host=self.__host, port=self.__port, debug=True) def run_as_main(enable_web_server: bool): if enable_web_server is False: program_pubtools: ProgramPublicTools = ProgramPublicTools() use_port = program_pubtools.find_available_server_port(number=1, start_port=8900)[0] powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools) powerline_adapter.init() server: StringServer = StringServer("0.0.0.0", use_port, powerline_adapter.run) print(f"http://localhost:{use_port}/initialize") server.run() else: program_pubtools: ProgramPublicTools = ProgramPublicTools() powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools) powerline_adapter.init() powerline_adapter.run() if __name__ == "__main__": using_web_server = False run_as_main(using_web_server)