123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import os
- import subprocess
- import re
- import time
- import sys
- class Launcher(object):
- def __init__(self):
- pass
- @staticmethod
- def __terminal_output(message: str):
- print(f"[Launcher] {message}")
- def run(self):
- server_process = subprocess.Popen(["python", "./server/server.py"], stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, text=True, bufsize=1)
- server_ready = False
- port: int = 1024
- for line in iter(server_process.stdout.readline, ""):
- print(f"line: {line}")
- match = re.search(r"Server is running on port \[(\d+)\].", line.strip())
- if match is not None:
- port = int(match.group(1))
- break
- for line in iter(server_process.stdout.readline, ""):
- if "[Server] Server program is ready..." in line:
- server_ready = True
- break
- if not server_ready:
- print("Server failed to start. Exiting.")
- server_process.kill()
- return
- print("Server is ready, starting client...")
- client_env = os.environ.copy()
- client_env["PLC_SIM_PORT"] = f"{port}"
- client_process = subprocess.Popen(["powershell", "npm run dev"], env=client_env, cwd="./client")
- try:
- while True:
- return_code = client_process.poll()
- if return_code is not None:
- print("Client process terminated with return code", return_code)
- break
- time.sleep(0.5)
- server_process.terminate()
- server_process.wait()
- except KeyboardInterrupt:
- server_process.terminate()
- client_process.terminate()
- server_process.wait()
- except Exception as e:
- print(f"An error occurred: {str(e)}")
- server_process.terminate()
- client_process.terminate()
- server_process.wait()
- client_process.wait()
- sys.exit(1)
- # Usage:
- launcher = Launcher()
- launcher.run()
|