build_and_release.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import queue
  2. import re
  3. import threading
  4. from winpty import PtyProcess
  5. class BuildAndRelease:
  6. def __init__(self):
  7. self.commands = [
  8. "python clean_build_all.py",
  9. "python build_all.py",
  10. "python make_release.py",
  11. "python clean_build_all.py",
  12. "python clean_build_all.py"
  13. ]
  14. self.user_input = input("请输入conda python环境的名称(例如:base): ")
  15. self.__readline_in_treading: bool = False
  16. def __run_command(self, command):
  17. try:
  18. # 如果是"python build_all.py"命令,先获取用户输入
  19. if "python build_all.py" == command:
  20. command += " "
  21. command += f"-python_env_name={self.user_input}"
  22. # 使用PtyProcess来运行命令
  23. process = PtyProcess.spawn(["powershell", "-Command", command])
  24. process.setwinsize(20, 9999)
  25. counter_output_is_all_none: int = 0
  26. while process.isalive() or not process.eof():
  27. try:
  28. if self.__readline_in_treading:
  29. output_queue = queue.Queue()
  30. read_thread = threading.Thread(target=self.__readline_threading, args=(process, output_queue))
  31. read_thread.start()
  32. read_thread.join(timeout=3)
  33. if not output_queue.empty():
  34. output = output_queue.get()
  35. else:
  36. continue
  37. else:
  38. output = process.readline()
  39. output = self.__remove_ansi_escape_codes(output)
  40. output = self.__remove_leading_newlines(output)
  41. if output in ["", " ", "\n", "\r\n", "\n\r"]:
  42. if counter_output_is_all_none != 0:
  43. continue
  44. else:
  45. counter_output_is_all_none += 1
  46. print("\n", end='')
  47. continue
  48. else:
  49. counter_output_is_all_none = 0
  50. output = self.__remove_ending_newlines(output)
  51. print(output, end='\n')
  52. except EOFError:
  53. break # 当伪终端关闭时跳出循环
  54. except Exception as e:
  55. print(f"Error executing command: {command}. Error: {e}")
  56. @staticmethod
  57. def __readline_threading(process, output_queue):
  58. line = process.readline()
  59. output_queue.put(line)
  60. @staticmethod
  61. def __remove_ansi_escape_codes(s):
  62. ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
  63. return ansi_escape.sub('', s)
  64. @staticmethod
  65. def __remove_leading_newlines(s):
  66. while s.startswith(('\n\r', '\r\n', '\n', '\r')):
  67. for seq in ['\n\r', '\r\n', '\n', '\r']:
  68. if s.startswith(seq):
  69. s = s[len(seq):]
  70. return s
  71. @staticmethod
  72. def __remove_ending_newlines(s):
  73. sequences = ['\n\r', '\r\n', '\n', '\r']
  74. while s.endswith(tuple(sequences)):
  75. for seq in sequences:
  76. if s.endswith(seq):
  77. s = s[:-len(seq)]
  78. return s
  79. def build_and_release(self):
  80. for command in self.commands:
  81. self.__run_command(command)
  82. if __name__ == "__main__":
  83. bar = BuildAndRelease()
  84. bar.build_and_release()