1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import queue
- import re
- import threading
- from winpty import PtyProcess
- class BuildAndRelease:
- def __init__(self):
- self.commands = [
- "python clean_build_all.py",
- "python build_all.py",
- "python make_release.py",
- # "python clean_build_all.py"
- ]
- self.user_input = input("请输入conda python环境的名称(例如:base): ")
- self.__readline_in_treading: bool = False
- def __run_command(self, command):
- try:
- # 如果是"python build_all.py"命令,先获取用户输入
- if "python build_all.py" == command:
- command += " "
- command += f"-python_env_name={self.user_input}"
- # 使用PtyProcess来运行命令
- process = PtyProcess.spawn(["powershell", "-Command", command])
- process.setwinsize(20, 9999)
- counter_output_is_all_none: int = 0
- while process.isalive() or not process.eof():
- try:
- if self.__readline_in_treading:
- output_queue = queue.Queue()
- read_thread = threading.Thread(target=self.__readline_threading, args=(process, output_queue))
- read_thread.start()
- read_thread.join(timeout=3)
- if not output_queue.empty():
- output = output_queue.get()
- else:
- continue
- else:
- output = process.readline()
- output = self.__remove_ansi_escape_codes(output)
- output = self.__remove_leading_newlines(output)
- if output in ["", " ", "\n", "\r\n", "\n\r"]:
- if counter_output_is_all_none != 0:
- continue
- else:
- counter_output_is_all_none += 1
- print("\n", end='')
- continue
- else:
- counter_output_is_all_none = 0
- output = self.__remove_ending_newlines(output)
- print(output, end='\n')
- except EOFError:
- break # 当伪终端关闭时跳出循环
- except Exception as e:
- print(f"Error executing command: {command}. Error: {e}")
- @staticmethod
- def __readline_threading(process, output_queue):
- line = process.readline()
- output_queue.put(line)
- @staticmethod
- def __remove_ansi_escape_codes(s):
- ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
- return ansi_escape.sub('', s)
- @staticmethod
- def __remove_leading_newlines(s):
- while s.startswith(('\n\r', '\r\n', '\n', '\r')):
- for seq in ['\n\r', '\r\n', '\n', '\r']:
- if s.startswith(seq):
- s = s[len(seq):]
- return s
- @staticmethod
- def __remove_ending_newlines(s):
- sequences = ['\n\r', '\r\n', '\n', '\r']
- while s.endswith(tuple(sequences)):
- for seq in sequences:
- if s.endswith(seq):
- s = s[:-len(seq)]
- return s
- def build_and_release(self):
- for command in self.commands:
- self.__run_command(command)
- if __name__ == "__main__":
- bar = BuildAndRelease()
- bar.build_and_release()
|