build_and_release.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import queue
  2. import re
  3. import threading
  4. from winpty import PtyProcess
  5. class BuildAndRelease:
  6. def __init__(self):
  7. self.commands = [
  8. "python clean_build_all.py",
  9. "python build_all.py",
  10. "python make_release.py",
  11. # "python clean_build_all.py"
  12. ]
  13. self.user_input = input("请输入conda python环境的名称(例如:base): ")
  14. self.__readline_in_treading: bool = False
  15. def __run_command(self, command):
  16. try:
  17. # 如果是"python build_all.py"命令,先获取用户输入
  18. if "python build_all.py" == command:
  19. command += " "
  20. command += f"-python_env_name={self.user_input}"
  21. # 使用PtyProcess来运行命令
  22. process = PtyProcess.spawn(["powershell", "-Command", command])
  23. process.setwinsize(20, 9999)
  24. counter_output_is_all_none: int = 0
  25. while process.isalive() or not process.eof():
  26. try:
  27. if self.__readline_in_treading:
  28. output_queue = queue.Queue()
  29. read_thread = threading.Thread(target=self.__readline_threading, args=(process, output_queue))
  30. read_thread.start()
  31. read_thread.join(timeout=3)
  32. if not output_queue.empty():
  33. output = output_queue.get()
  34. else:
  35. continue
  36. else:
  37. output = process.readline()
  38. output = self.__remove_ansi_escape_codes(output)
  39. output = self.__remove_leading_newlines(output)
  40. if output in ["", " ", "\n", "\r\n", "\n\r"]:
  41. if counter_output_is_all_none != 0:
  42. continue
  43. else:
  44. counter_output_is_all_none += 1
  45. print("\n", end='')
  46. continue
  47. else:
  48. counter_output_is_all_none = 0
  49. output = self.__remove_ending_newlines(output)
  50. print(output, end='\n')
  51. except EOFError:
  52. break # 当伪终端关闭时跳出循环
  53. except Exception as e:
  54. print(f"Error executing command: {command}. Error: {e}")
  55. @staticmethod
  56. def __readline_threading(process, output_queue):
  57. line = process.readline()
  58. output_queue.put(line)
  59. @staticmethod
  60. def __remove_ansi_escape_codes(s):
  61. ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
  62. return ansi_escape.sub('', s)
  63. @staticmethod
  64. def __remove_leading_newlines(s):
  65. while s.startswith(('\n\r', '\r\n', '\n', '\r')):
  66. for seq in ['\n\r', '\r\n', '\n', '\r']:
  67. if s.startswith(seq):
  68. s = s[len(seq):]
  69. return s
  70. @staticmethod
  71. def __remove_ending_newlines(s):
  72. sequences = ['\n\r', '\r\n', '\n', '\r']
  73. while s.endswith(tuple(sequences)):
  74. for seq in sequences:
  75. if s.endswith(seq):
  76. s = s[:-len(seq)]
  77. return s
  78. def build_and_release(self):
  79. for command in self.commands:
  80. self.__run_command(command)
  81. if __name__ == "__main__":
  82. bar = BuildAndRelease()
  83. bar.build_and_release()