import queue import re import threading from winpty import PtyProcess class BuildAndRelease: def __init__(self): self.commands = [ "python clean_build_all.py", "python build_all.py", "python make_release.py", # "python clean_build_all.py" ] self.user_input = input("请输入conda python环境的名称(例如:base): ") self.__readline_in_treading: bool = False def __run_command(self, command): try: # 如果是"python build_all.py"命令,先获取用户输入 if "python build_all.py" == command: command += " " command += f"-python_env_name={self.user_input}" # 使用PtyProcess来运行命令 process = PtyProcess.spawn(["powershell", "-Command", command]) process.setwinsize(20, 9999) counter_output_is_all_none: int = 0 while process.isalive() or not process.eof(): try: if self.__readline_in_treading: output_queue = queue.Queue() read_thread = threading.Thread(target=self.__readline_threading, args=(process, output_queue)) read_thread.start() read_thread.join(timeout=3) if not output_queue.empty(): output = output_queue.get() else: continue else: output = process.readline() output = self.__remove_ansi_escape_codes(output) output = self.__remove_leading_newlines(output) if output in ["", " ", "\n", "\r\n", "\n\r"]: if counter_output_is_all_none != 0: continue else: counter_output_is_all_none += 1 print("\n", end='') continue else: counter_output_is_all_none = 0 output = self.__remove_ending_newlines(output) print(output, end='\n') except EOFError: break # 当伪终端关闭时跳出循环 except Exception as e: print(f"Error executing command: {command}. Error: {e}") @staticmethod def __readline_threading(process, output_queue): line = process.readline() output_queue.put(line) @staticmethod def __remove_ansi_escape_codes(s): ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', s) @staticmethod def __remove_leading_newlines(s): while s.startswith(('\n\r', '\r\n', '\n', '\r')): for seq in ['\n\r', '\r\n', '\n', '\r']: if s.startswith(seq): s = s[len(seq):] return s @staticmethod def __remove_ending_newlines(s): sequences = ['\n\r', '\r\n', '\n', '\r'] while s.endswith(tuple(sequences)): for seq in sequences: if s.endswith(seq): s = s[:-len(seq)] return s def build_and_release(self): for command in self.commands: self.__run_command(command) if __name__ == "__main__": bar = BuildAndRelease() bar.build_and_release()