|
@@ -1,3 +1,7 @@
|
|
|
+import queue
|
|
|
+import re
|
|
|
+import threading
|
|
|
+
|
|
|
from winpty import PtyProcess
|
|
|
|
|
|
|
|
@@ -11,6 +15,7 @@ class BuildAndRelease:
|
|
|
"python clean_build_all.py"
|
|
|
]
|
|
|
self.user_input = input("请输入conda python环境的名称(例如:base): ")
|
|
|
+ self.__readline_in_treading: bool = False
|
|
|
|
|
|
def __run_command(self, command):
|
|
|
try:
|
|
@@ -19,17 +24,67 @@ class BuildAndRelease:
|
|
|
command += " "
|
|
|
command += f"-python_env_name={self.user_input}"
|
|
|
# 使用PtyProcess来运行命令
|
|
|
- proc = PtyProcess.spawn(["powershell", "-Command", command])
|
|
|
- proc.setwinsize(20, 9999)
|
|
|
- # 读取输出并打印到当前终端
|
|
|
- while True:
|
|
|
- data = proc.readline()
|
|
|
- if not data:
|
|
|
- break
|
|
|
- print(data, end='')
|
|
|
+ process = PtyProcess.spawn(["powershell", "-Command", command])
|
|
|
+ process.setwinsize(20, 9999)
|
|
|
+ counter_output_is_all_none: int = 0
|
|
|
+ while process.isalive() or not process.eof():
|
|
|
+ try:
|
|
|
+ if self.__readline_in_treading:
|
|
|
+ output_queue = queue.Queue()
|
|
|
+ read_thread = threading.Thread(target=self.__readline_threading, args=(process, output_queue))
|
|
|
+ read_thread.start()
|
|
|
+ read_thread.join(timeout=3)
|
|
|
+ if not output_queue.empty():
|
|
|
+ output = output_queue.get()
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ output = process.readline()
|
|
|
+ output = self.__remove_ansi_escape_codes(output)
|
|
|
+ output = self.__remove_leading_newlines(output)
|
|
|
+ if output in ["", " ", "\n", "\r\n", "\n\r"]:
|
|
|
+ if counter_output_is_all_none != 0:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ counter_output_is_all_none += 1
|
|
|
+ print("\n", end='')
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ counter_output_is_all_none = 0
|
|
|
+ output = self.__remove_ending_newlines(output)
|
|
|
+ print(output, end='\n')
|
|
|
+ except EOFError:
|
|
|
+ break # 当伪终端关闭时跳出循环
|
|
|
except Exception as e:
|
|
|
print(f"Error executing command: {command}. Error: {e}")
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def __readline_threading(process, output_queue):
|
|
|
+ line = process.readline()
|
|
|
+ output_queue.put(line)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __remove_ansi_escape_codes(s):
|
|
|
+ ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
|
|
|
+ return ansi_escape.sub('', s)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __remove_leading_newlines(s):
|
|
|
+ while s.startswith(('\n\r', '\r\n', '\n', '\r')):
|
|
|
+ for seq in ['\n\r', '\r\n', '\n', '\r']:
|
|
|
+ if s.startswith(seq):
|
|
|
+ s = s[len(seq):]
|
|
|
+ return s
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __remove_ending_newlines(s):
|
|
|
+ sequences = ['\n\r', '\r\n', '\n', '\r']
|
|
|
+ while s.endswith(tuple(sequences)):
|
|
|
+ for seq in sequences:
|
|
|
+ if s.endswith(seq):
|
|
|
+ s = s[:-len(seq)]
|
|
|
+ return s
|
|
|
+
|
|
|
def build_and_release(self):
|
|
|
for command in self.commands:
|
|
|
self.__run_command(command)
|