1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import os
- import subprocess
- import re
- import time
- import sys
- import configparser
- class Launcher(object):
- def __init__(self):
- pass
- @staticmethod
- def __terminal_output(message: str):
- print(f"[Launcher] {message}")
- def run(self):
- config = configparser.ConfigParser()
- config.read('instrument_setting.ini')
- server_env = os.environ.copy()
- variables_to_set = {
- "PLC_SIM_SERVER_DIGITAL_MULTIMETER": config['PLC_SIM_SERVER']['DIGITAL_MULTIMETER'],
- "PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE": config['PLC_SIM_SERVER']['DIGITAL_OSCILLOSCOPE'],
- "PLC_SIM_SERVER_WAVEFORM_GENERATOR": config['PLC_SIM_SERVER']['WAVEFORM_GENERATOR'],
- "PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD": config['PLC_SIM_SERVER']['ANALOG_ELECTRONIC_LOAD']
- }
- for var, value in variables_to_set.items():
- server_env[var] = value
- print(f"Using config: {var} = {value}")
- server_process = subprocess.Popen(["./server/server.exe"], stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, text=True, bufsize=1, env=server_env)
- server_ready = False
- port: int = 1024
- for line in iter(server_process.stdout.readline, ""):
- print(f"line: {line}")
- match = re.search(r"Server is running on port \[(\d+)\].", line.strip())
- if match is not None:
- port = int(match.group(1))
- break
- for line in iter(server_process.stdout.readline, ""):
- print(f"{line}")
- if "[Server] Server program is ready..." in line:
- server_ready = True
- break
- if not server_ready:
- print("Server failed to start. Exiting.")
- server_process.kill()
- return
- print("Server is ready, starting client...")
- client_env = os.environ.copy()
- client_env["PLC_SIM_PORT"] = f"{port}"
- client_process = subprocess.Popen(["./PLC-SIM.exe"], env=client_env)
- try:
- while True:
- return_code = client_process.poll()
- if return_code is not None:
- print("Client process terminated with return code", return_code)
- break
- time.sleep(0.5)
- server_process.terminate()
- server_process.wait()
- except KeyboardInterrupt:
- server_process.terminate()
- client_process.terminate()
- server_process.wait()
- except Exception as e:
- print(f"An error occurred: {str(e)}")
- server_process.terminate()
- client_process.terminate()
- server_process.wait()
- client_process.wait()
- sys.exit(1)
- # Usage:
- launcher = Launcher()
- launcher.run()
|