123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- import os
- import queue
- import subprocess
- import threading
- import time
- from typing import Union
- from flask import Flask, request, jsonify
- from flask_cors import CORS
- from flask_socketio import SocketIO
- from program_public_tools import ProgramPublicTools
- class EthernetIPv4Searcher:
- @staticmethod
- def search_ipv4(ethernet_title_mark: str = "以太网"):
- try:
- ipconfig_output = os.popen('ipconfig').read()
- adapters = []
- section_list = ipconfig_output.split('\n\n')
- index = 0
- max_index = len(section_list) - 1
- while True:
- section = section_list[index]
- if ethernet_title_mark in section:
- for line in section_list[index + 1].split('\n'):
- if 'IPv4 Address' in line or 'IP Address' in line:
- ip_address = line.split(':')[-1].strip()
- adapters.append(ip_address)
- index += 1
- if index > max_index:
- break
- return adapters
- except Exception as e:
- return {"Error": str(e)}
- class IperfClientConfig(object):
- ip: str = ""
- port: int = 50500
- ports: list[int] = [50500, 50555]
- class IperfServerConfig(object):
- ip: str = ""
- port: int = 50500
- ports: list[int] = [50500, 50555]
- class ProgramConnectMethod(object):
- general = 0
- win_terminal = 1
- lin_terminal = 2
- class PowerLineAdapterConfig(object):
- client: IperfClientConfig = IperfClientConfig()
- server: IperfServerConfig = IperfServerConfig()
- connect_method: ProgramConnectMethod = ProgramConnectMethod.win_terminal
- report_interval_sec: int = 1
- run_time_sec: int = 60
- enable_auto_get_ip: bool = True
- enable_auto_find_available_server_ip: bool = True
- enable_realtime_output_iperf_analyze: bool = True
- class PowerlineAdapterPublicTools(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.pubtools: ProgramPublicTools = pubtools
- def push_result(self, hook, *args):
- self.pubtools.debug_output(*args)
- if hook is not None:
- hook(''.join([str(arg) for arg in args]))
- class PowerlineAdapter(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.__pubtools: ProgramPublicTools = pubtools
- self.config: PowerLineAdapterConfig = PowerLineAdapterConfig()
- self.__iperf_runner: Union[IperfRunnerGeneral, IperfRunnerWinTerminal, IperfRunnerLinTerminal, None] = None
- self.__tools: PowerlineAdapterPublicTools = PowerlineAdapterPublicTools(self.__pubtools)
- self.__analyze_result: str = ""
- self.__hook = None
- def init(self):
- if self.config.connect_method == ProgramConnectMethod.general:
- self.__iperf_runner: IperfRunnerGeneral = IperfRunnerGeneral(self.__tools, self.config)
- elif self.config.connect_method == ProgramConnectMethod.win_terminal:
- self.__iperf_runner: IperfRunnerWinTerminal = IperfRunnerWinTerminal(self.__tools, self.config)
- elif self.config.connect_method == ProgramConnectMethod.lin_terminal:
- self.__iperf_runner: IperfRunnerLinTerminal = IperfRunnerLinTerminal(self.__tools, self.config)
- else:
- raise Exception("Error in PowerlineAdapter Config: Unknown connection type in config.connect_method.")
- def run(self, terminal_hook=None):
- threading.Thread(target=self.__run_main, args=(terminal_hook,)).start()
- def __run_main(self, terminal_hook=None):
- self.__hook = terminal_hook
- self.__iperf_runner.add_hook(self.__hook)
- if self.config.enable_auto_get_ip is True:
- get_ip_list = EthernetIPv4Searcher().search_ipv4()
- self.config.server.ip = get_ip_list[0]
- self.config.client.ip = get_ip_list[1]
- if self.config.enable_auto_find_available_server_ip is True:
- server_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.server.ip)[0]
- client_port = self.__tools.pubtools.find_available_server_port(number=1, address=self.config.client.ip)[0]
- self.config.server.port = server_port
- self.config.client.port = client_port
- self.__analyze_result = self.__iperf_runner.run()
- self.__tools.push_result(self.__hook, self.__analyze_result)
- class IperfRunnerWinTerminal(object):
- def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
- self.__config: PowerLineAdapterConfig = config
- self.__tools: PowerlineAdapterPublicTools = tools
- self.__server_port_error_retry_history: list[int] = []
- self.__client_port_error_retry_history: list[int] = []
- self.__client_thread_permitted_to_run: bool = False
- self.analyze_result: str = ""
- self.__hook = None
- def add_hook(self, hook):
- self.__hook = hook
- def run(self):
- server_thread = threading.Thread(target=self.__server_thread)
- client_thread = threading.Thread(target=self.__client_thread)
- server_thread.start()
- client_thread.start()
- server_thread.join()
- client_thread.join()
- return self.analyze_result
- def __server_thread(self):
- self.__run_server()
- def __client_thread(self):
- while True:
- if self.__client_thread_permitted_to_run is True:
- time.sleep(5)
- break
- else:
- continue
- self.__run_client()
- def __run_server(self):
- config_interval = self.__config.report_interval_sec
- config_port = self.__config.server.port
- config_ip = self.__config.server.ip
- cmd = f"iperf3 -s -B {config_ip} -i {config_interval} -p {config_port}"
- self.__win_cmd_run(cmd, self.__server_terminal_output_process)
- def __server_port_error_retry(self):
- if self.__config.enable_auto_find_available_server_ip is True:
- self.__server_port_error_retry_history.append(self.__config.server.port)
- self.__client_port_error_retry_history.append(self.__config.client.port)
- server_port = self.__tools.pubtools.find_available_server_port(
- number=1, address=self.__config.server.ip, not_list=self.__server_port_error_retry_history)[0]
- client_port = self.__tools.pubtools.find_available_server_port(
- number=1, address=self.__config.client.ip, not_list=self.__client_port_error_retry_history)[0]
- self.__config.server.port = server_port
- self.__config.client.port = client_port
- self.__run_server()
- else:
- raise Exception("Error: Auto-find-available-server-ip is disabled, and the specified ip is not available.")
- def __server_terminal_output_process(self, message: str) -> None:
- if message is not None:
- if "Address already in use" in message:
- self.__server_port_error_retry()
- else:
- self.__client_thread_permitted_to_run = True
- else:
- self.__client_thread_permitted_to_run = True
- def __analyze_result_clear(self):
- self.analyze_result: str = ""
- def __run_client(self):
- self.__analyze_result_clear()
- config_server_ip = self.__config.server.ip
- config_interval = self.__config.report_interval_sec
- config_time = self.__config.run_time_sec
- config_port = self.__config.server.port
- config_ip = self.__config.client.ip
- cmd = f"iperf3 -c {config_server_ip} -B {config_ip} -i {config_interval} -t {config_time} -p {config_port}"
- if self.__config.enable_realtime_output_iperf_analyze is True:
- self.__pty_cmd_run(cmd, self.__client_terminal_output_process)
- else:
- self.__win_cmd_run(cmd, self.__client_terminal_output_process)
- def __client_terminal_output_process(self, message: str) -> None:
- if message is not None:
- if self.__hook is not None:
- self.__hook(message + "\n")
- self.analyze_result = self.analyze_result + message + "\n"
- def __pty_cmd_run(self, cmd, terminal_output_process_method):
- from winpty import PtyProcess
- proc = PtyProcess.spawn(cmd)
- while proc.isalive():
- line = proc.readline()
- if line == "":
- time.sleep(0.1)
- continue
- output = self.__pty_output_filter(f"{line.strip()}")
- self.__tools.push_result(self.__hook, output)
- terminal_output_process_method(output)
- @staticmethod
- def __pty_output_filter(message: str) -> str:
- message = message.replace("\\", "\\\\")
- if message.count("\\\\") >= 2:
- position = message.find("] ") - 4
- message = message[position:]
- return message
- @staticmethod
- def __win_cmd_run(cmd, terminal_output_process_method):
- process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- output_queue = queue.Queue()
- def read_output(get_process, get_output_queue):
- for line in iter(get_process.stdout.readline, b''):
- get_output_queue.put(line.decode())
- output_thread = threading.Thread(target=read_output, args=(process, output_queue))
- output_thread.start()
- while output_thread.is_alive():
- try:
- while True:
- terminal_output_process_method(output_queue.get_nowait())
- except queue.Empty:
- terminal_output_process_method(None)
- time.sleep(0.1)
- while not output_queue.empty():
- terminal_output_process_method(output_queue.get())
- process.wait()
- output_thread.join()
- class IperfRunnerLinTerminal(object):
- def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
- self.__config: PowerLineAdapterConfig = config
- self.__tools: PowerlineAdapterPublicTools = tools
- def run(self):
- pass
- class IperfRunnerGeneral(object):
- def __init__(self, tools: PowerlineAdapterPublicTools, config: PowerLineAdapterConfig):
- self.__config: PowerLineAdapterConfig = config
- self.__tools: PowerlineAdapterPublicTools = tools
- def run(self):
- pass
- class StringServer:
- def __init__(self, host: str, port: int, start_method):
- self.__host = host
- self.__port = port
- self.app = Flask(__name__)
- CORS(self.app)
- self.socketio = SocketIO(self.app, cors_allowed_origins="*")
- self.__start_method = start_method
- self.__is_running: bool = False
- self.__configure_routes()
- self.__configure_socket_events()
- def __configure_routes(self):
- @self.app.route('/initialize', methods=['POST'])
- def initialize():
- message = request.json.get('message')
- if message == 'start':
- if self.__is_running is False:
- self.__start_method(self.push_string)
- self.__is_running = True
- return jsonify({"status": "success"})
- def push_string(self, data_string):
- """Push a string to all connected clients."""
- self.socketio.emit('new_string', {'data': data_string}, namespace='/test', broadcast=True)
- def __configure_socket_events(self):
- @self.socketio.on('connect', namespace='/test')
- def test_connect():
- print('Client connected')
- @self.socketio.on('disconnect', namespace='/test')
- def test_disconnect():
- print('Client disconnected')
- def run(self):
- self.socketio.run(self.app, host=self.__host, port=self.__port, debug=True)
- def run_as_main(enable_web_server: bool):
- if enable_web_server is False:
- program_pubtools: ProgramPublicTools = ProgramPublicTools()
- use_port = program_pubtools.find_available_server_port(number=1, start_port=8900)[0]
- powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
- powerline_adapter.init()
- server: StringServer = StringServer("0.0.0.0", use_port, powerline_adapter.run)
- print(f"http://localhost:{use_port}/initialize")
- server.run()
- else:
- program_pubtools: ProgramPublicTools = ProgramPublicTools()
- powerline_adapter: PowerlineAdapter = PowerlineAdapter(program_pubtools)
- powerline_adapter.init()
- powerline_adapter.run()
- if __name__ == "__main__":
- using_web_server = False
- run_as_main(using_web_server)
|