instrument_controller.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. import copy
  2. import os
  3. import random
  4. import threading
  5. import time
  6. import typing
  7. from collections import deque
  8. from enum import Enum
  9. from typing import Optional
  10. import cv2
  11. import numpy as np
  12. import pyvisa
  13. from numpy import ndarray as mat
  14. from pyvisa import ResourceManager as scpiManager
  15. from pyvisa.resources import MessageBasedResource as scpiConnection
  16. from flask import Flask, Response
  17. from flask import Flask, jsonify
  18. from flask_cors import CORS
  19. from program_public_tools import ProgramPublicTools
  20. class ScpiInstrument(object):
  21. connection: scpiConnection
  22. name: str
  23. response: str
  24. class InstrumentResponseMark(object):
  25. def __init__(self):
  26. self.digital_multimeter: str = "DM3068"
  27. self.digital_oscilloscope: str = "DHO1204"
  28. self.waveform_generator: str = "DG4202"
  29. self.analog_electronic_load: str = "DL3021A"
  30. self.__check_env_value()
  31. def __check_env_value(self):
  32. config_digital_multimeter = self.__get_env("PLC_SIM_SERVER_DIGITAL_MULTIMETER")
  33. config_digital_oscilloscope = self.__get_env("PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE")
  34. config_waveform_generator = self.__get_env("PLC_SIM_SERVER_WAVEFORM_GENERATOR")
  35. config_analog_electronic_load = self.__get_env("PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD")
  36. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_MULTIMETER = {config_digital_multimeter}")
  37. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE = {config_digital_oscilloscope}")
  38. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_WAVEFORM_GENERATOR = {config_waveform_generator}")
  39. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD = {config_analog_electronic_load}")
  40. if config_digital_multimeter is not None:
  41. self.digital_multimeter = config_digital_multimeter
  42. if config_digital_oscilloscope is not None:
  43. self.digital_oscilloscope = config_digital_oscilloscope
  44. if config_waveform_generator is not None:
  45. self.waveform_generator = config_waveform_generator
  46. if config_analog_electronic_load is not None:
  47. self.analog_electronic_load = config_analog_electronic_load
  48. @staticmethod
  49. def __get_env(name: str) -> Optional[str]:
  50. return os.environ.get(name, None)
  51. class InstrumentControllerConfig(object):
  52. connection_timeout_ms: int = 5000
  53. response_mark: InstrumentResponseMark = InstrumentResponseMark()
  54. buffer_path: str = "./temp/"
  55. class InstrumentServices(object):
  56. def __init__(self):
  57. self.digital_multimeter: Optional[DigitalMultimeterService] = None
  58. self.digital_oscilloscope: Optional[DigitalOscilloscopeService] = None
  59. self.waveform_generator: Optional[WaveformGeneratorService] = None
  60. self.analog_electronic_load: Optional[AnalogElectronicLoadService] = None
  61. def check(self) -> list[str]:
  62. not_connect_list: list[str] = []
  63. for attr_name, attr_value in vars(self).items():
  64. if attr_value is None:
  65. not_connect_list.append(attr_name)
  66. return not_connect_list
  67. class InstrumentController(object):
  68. def __init__(self, pubtools: ProgramPublicTools):
  69. self.__scpi_manager: scpiManager = self.__init_scpi_manager()
  70. self.config: InstrumentControllerConfig = InstrumentControllerConfig()
  71. self.__pubtools: ProgramPublicTools = pubtools
  72. self.__scpi_instrument_list: list[ScpiInstrument] = []
  73. self.services: InstrumentServices = InstrumentServices()
  74. self.__retry_times: int = 1
  75. self.__max_connection_number: int = 4
  76. self.__connection_timeout_signal: int = 0
  77. @staticmethod
  78. def __init_scpi_manager() -> scpiManager:
  79. return pyvisa.ResourceManager()
  80. def auto_connect(self) -> tuple[list[ScpiInstrument], list[str]]:
  81. self.__auto_disconnect()
  82. self.__scpi_instrument_list = self.__scan_and_connect_scpi_all()
  83. for instrument in self.__scpi_instrument_list:
  84. if self.config.response_mark.digital_multimeter in instrument.response:
  85. self.services.digital_multimeter = DigitalMultimeterService(instrument, self.__pubtools, self.config)
  86. elif self.config.response_mark.digital_oscilloscope in instrument.response:
  87. self.services.digital_oscilloscope = DigitalOscilloscopeService(instrument, self.__pubtools,
  88. self.config)
  89. elif self.config.response_mark.waveform_generator in instrument.response:
  90. self.services.waveform_generator = WaveformGeneratorService(instrument, self.__pubtools, self.config)
  91. elif self.config.response_mark.analog_electronic_load in instrument.response:
  92. self.services.analog_electronic_load = AnalogElectronicLoadService(instrument, self.__pubtools,
  93. self.config)
  94. else:
  95. pass
  96. not_connect_list = self.services.check()
  97. if len(not_connect_list) == 0:
  98. self.__pubtools.debug_output("All SCPI instruments connect successfully.")
  99. else:
  100. for instrument_name in not_connect_list:
  101. self.__pubtools.debug_output(f'Error: Cannot connect to instrument "{instrument_name}".')
  102. return self.__scpi_instrument_list, not_connect_list
  103. def __scan_and_connect_scpi_all(self) -> list[ScpiInstrument]:
  104. self.__pubtools.debug_output("Start to connect instruments...")
  105. device_name_list: tuple[str] = self.__scpi_manager.list_resources()
  106. scpi_instrument_list: list[ScpiInstrument] = []
  107. connect_number: int = 0
  108. if len(device_name_list) != 0:
  109. for device_name in device_name_list:
  110. scpi_connection, scpi_response = self.__scpi_connect(device_name)
  111. if scpi_connection is not None:
  112. scpi_instrument: ScpiInstrument = ScpiInstrument()
  113. scpi_instrument.connection = scpi_connection
  114. scpi_instrument.name = device_name
  115. scpi_instrument.response = scpi_response
  116. scpi_instrument_list.append(scpi_instrument)
  117. connect_number += 1
  118. if self.__connection_timeout_signal >= 2:
  119. self.__connection_timeout_signal = 0
  120. break
  121. if connect_number >= self.__max_connection_number:
  122. break
  123. self.__pubtools.debug_output(f"Connect completed! Connected to [{connect_number}] instruments.")
  124. return scpi_instrument_list
  125. def __auto_disconnect(self) -> list[ScpiInstrument]:
  126. new_scpi_instrument_list: list[ScpiInstrument] = []
  127. if len(self.__scpi_instrument_list) != 0:
  128. for scpi_instrument in self.__scpi_instrument_list:
  129. scpi_instrument.connection.close()
  130. self.__scpi_instrument_list = new_scpi_instrument_list
  131. return new_scpi_instrument_list
  132. def __scpi_connect(self, device_name: str) -> tuple[Optional[scpiConnection], Optional[str]]:
  133. if (device_name.count("::") < 2):
  134. self.__connection_timeout_signal += 1
  135. self.__pubtools.debug_output(f'[InstrumentController] Warning: Maybe connecting a invalid virtual SCPI object "{device_name}". Auto ignore.')
  136. else:
  137. for _ in range(0, self.__retry_times):
  138. try:
  139. scpi_connection = typing.cast(scpiConnection, self.__scpi_manager.open_resource(device_name))
  140. scpi_connection.timeout = self.config.connection_timeout_ms
  141. scpi_response: str = scpi_connection.query('*IDN?')
  142. scpi_response = scpi_response.strip()
  143. self.__pubtools.debug_output(f'Connect to SCPI device "{device_name}".')
  144. self.__pubtools.debug_output(f'Device response: "{scpi_response}"')
  145. except Exception as e:
  146. self.__pubtools.debug_output(f'Error when attempting to connect scpi device "{device_name}".')
  147. info = f'Error information: {e}'
  148. self.__pubtools.debug_output(info)
  149. if ("timeout" in info) or ("Timeout" in info):
  150. self.__connection_timeout_signal += 1
  151. self.__pubtools.debug_output("[InstrumentController] Warning: One timeout expection when connecting.")
  152. self.__pubtools.debug_output(f'Start to retry......')
  153. else:
  154. return scpi_connection, scpi_response
  155. self.__pubtools.debug_output(f'Retrying failed for {self.__retry_times} times. Give up connecting.')
  156. return None, None
  157. class FloatServer:
  158. def __init__(self, name: Optional[str] = None):
  159. self.name: Optional[str] = name
  160. self.__app = Flask(__name__)
  161. CORS(self.__app)
  162. self.__value = 0.0
  163. self.__setup_routes()
  164. def __setup_routes(self):
  165. if self.name is None:
  166. self.name = f"/float_default_{time.time()}"
  167. else:
  168. self.name = f"/{self.name}"
  169. @self.__app.route(self.name, methods=['GET'])
  170. def get_value():
  171. return jsonify({'value': self.__value})
  172. def run(self, host, port):
  173. self.__app.run(host=host, port=port)
  174. def push(self, new_value):
  175. self.__value = new_value
  176. class MultimeterValueRange(Enum):
  177. range_200mv = "0"
  178. range_2v = "1"
  179. range_20v = "2"
  180. range_200v = "3"
  181. range_1000v = "4"
  182. default = "3"
  183. class MultimeterValueRangeTools(object):
  184. @staticmethod
  185. def get_range_type() -> list[str]:
  186. type_list: list[str] = ["200mv", "2v", "20v", "200v", "1000v", "default"]
  187. return type_list
  188. @staticmethod
  189. def solve_range(rang_string) -> MultimeterValueRange:
  190. match rang_string:
  191. case "200mv":
  192. return MultimeterValueRange.range_200mv
  193. case "2v":
  194. return MultimeterValueRange.range_2v
  195. case "20v":
  196. return MultimeterValueRange.range_20v
  197. case "200v":
  198. return MultimeterValueRange.range_200v
  199. case "1000v":
  200. return MultimeterValueRange.range_1000v
  201. case "default":
  202. return MultimeterValueRange.default
  203. case _:
  204. return MultimeterValueRange.default
  205. class DigitalMultimeterService(object):
  206. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  207. config: InstrumentControllerConfig):
  208. self.scpi_instrument: ScpiInstrument = scpi_instrument
  209. self.__pubtools: ProgramPublicTools = pubtools
  210. self.__config: InstrumentControllerConfig = config
  211. self.__range: MultimeterValueRange = MultimeterValueRange.default
  212. self.realtime_value: float = float(0)
  213. self.__listening_thread: Optional[threading.Thread] = None
  214. self.__float_server_thread: Optional[threading.Thread] = None
  215. self.__whether_range_need_to_set: bool = True
  216. self.__error_count: int = 0
  217. self.name: str = "Digital Multimeter"
  218. self.__instrument_name: str = "multimeter"
  219. self.__float_server: FloatServer = FloatServer(self.__instrument_name)
  220. self.__is_listening: bool = False
  221. self.__server_url: str = ''
  222. def set_range(self, value_range: MultimeterValueRange):
  223. self.__range = value_range
  224. @staticmethod
  225. def get_range() -> list[str]:
  226. return MultimeterValueRangeTools.get_range_type()
  227. @staticmethod
  228. def solve_range_string(range_string: str) -> MultimeterValueRange:
  229. return MultimeterValueRangeTools.solve_range(range_string)
  230. def __float_server_main(self, float_server: FloatServer, server_host, server_port):
  231. self.__float_server.run(host=server_host, port=server_port)
  232. def keep_listening(self, realtime_terminal_output: bool = False,
  233. server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
  234. if self.__is_listening is False:
  235. self.__start_server_listening(realtime_terminal_output, server_port, server_host)
  236. self.__is_listening = True
  237. return self.__return_server_info()
  238. def __start_server_listening(self, realtime_terminal_output: bool = False,
  239. server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
  240. if server_port is None:
  241. server_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8600)[0]
  242. listening_thread_args = (realtime_terminal_output,)
  243. self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
  244. float_server_args = (self.__float_server, server_host, server_port)
  245. self.__float_server_thread = threading.Thread(target=self.__float_server_main, args=float_server_args).start()
  246. server_root_name = self.__float_server.name
  247. self.__server_url = f"http://localhost:{server_port}{server_root_name}"
  248. return self.__return_server_info()
  249. def __return_server_info(self):
  250. return self.__listening_thread, self.__float_server_thread, self.__server_url
  251. def __listening_main(self, realtime_terminal_output: bool):
  252. threading.Thread(target=self.__listening_auto_clear)
  253. while True:
  254. self.realtime_value = self.__get_value()
  255. self.__float_server.push(self.realtime_value)
  256. if realtime_terminal_output is True:
  257. self.__pubtools.debug_output(f"Listening voltage from digital multimeter: {self.realtime_value}.")
  258. def __listening_auto_clear(self):
  259. while True:
  260. self.__whether_range_need_to_set = True
  261. time.sleep(5)
  262. def get_value_latest(self) -> float:
  263. return self.realtime_value
  264. def __get_value(self, use_range: Optional[MultimeterValueRange] = None):
  265. if use_range is None:
  266. use_range = self.__range
  267. else:
  268. self.__whether_range_need_to_set = True
  269. if (self.__whether_range_need_to_set is True) and (self.__error_count <= 3):
  270. try:
  271. cmd_set = f":MEASure:VOLTage:DC {use_range.value}"
  272. self.scpi_instrument.connection.query(cmd_set)
  273. except Exception as e:
  274. self.__pubtools.debug_output("Exception found when trying to set range of digital multimeter.")
  275. self.__pubtools.debug_output(f"Error info: {e}")
  276. self.__error_count += 1
  277. else:
  278. self.__whether_range_need_to_set = False
  279. self.__error_count = 0
  280. cmd_query = ":MEASure:VOLTage:DC?"
  281. multimeter_value = self.scpi_instrument.connection.query(cmd_query)
  282. return float(multimeter_value)
  283. class VideoStreamer:
  284. def __init__(self, buffer_size: int = 10, name: Optional[str] = None):
  285. self.name: Optional[str] = name
  286. self.__app = Flask(__name__)
  287. self.__frame_queue = deque(maxlen=buffer_size)
  288. self.__lock = threading.Lock() # 用于确保线程安全
  289. self.__setup_routes()
  290. def push(self, frame):
  291. """
  292. 推送一个帧到流。
  293. """
  294. with self.__lock:
  295. self.__frame_queue.append(frame)
  296. def __generate(self):
  297. while True:
  298. if len(self.__frame_queue) == 0:
  299. time.sleep(0.05) # 若队列为空,稍微等待
  300. continue
  301. with self.__lock:
  302. frame = self.__frame_queue.popleft() # 从队列左侧获取帧
  303. ret, jpeg = cv2.imencode('.jpg', frame)
  304. if ret:
  305. yield (b'--frame\r\n'
  306. b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
  307. def __video_feed(self):
  308. return Response(self.__generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
  309. def __setup_routes(self):
  310. if self.name is None:
  311. self.name = f"/video_default_{time.time()}"
  312. else:
  313. self.name = f"/{self.name}"
  314. self.__app.add_url_rule(self.name, 'video_feed', self.__video_feed)
  315. def run(self, host, port):
  316. self.__app.run(threaded=True, host=host, port=port)
  317. return self.name
  318. class DigitalOscilloscopeService(object):
  319. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  320. config: InstrumentControllerConfig):
  321. self.scpi_instrument: ScpiInstrument = scpi_instrument
  322. self.__pubtools: ProgramPublicTools = pubtools
  323. self.__config: InstrumentControllerConfig = config
  324. self.__instrument_name: str = "oscilloscope"
  325. self.realtime_screenshot: Optional[mat] = None
  326. self.__listening_thread: Optional[threading.Thread] = None
  327. self.__streamer_thread: Optional[threading.Thread] = None
  328. self.__windows_name: str = "Realtime Screenshot From Digital Oscilloscope"
  329. self.__video_streamer: VideoStreamer = VideoStreamer(name="oscilloscope")
  330. self.name: str = "Digital Oscilloscope"
  331. self.__is_listening: bool = False
  332. self.__stream_port: Optional[int] = None
  333. self.__stream_url: Optional[str] = None
  334. self.__screenshot_frame = None
  335. self.__img_empty = np.zeros((500, 500, 3), dtype=np.uint8)
  336. self.__img_empty[:, :] = [255, 255, 255] # BGR
  337. def keep_listening(self, realtime_terminal_output: bool = False,
  338. stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
  339. if self.__is_listening is False:
  340. self.__start_listening_server(realtime_terminal_output, stream_port, stream_host)
  341. self.__is_listening = True
  342. return self.__return_stream_info()
  343. def __return_stream_info(self):
  344. return self.__listening_thread, self.__stream_port, self.__stream_url
  345. def __start_listening_server(self, realtime_terminal_output: bool = False,
  346. stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
  347. if stream_port is None:
  348. stream_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=9000)[0]
  349. self.__stream_port = stream_port
  350. listening_thread_args = (realtime_terminal_output, True)
  351. self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
  352. streamer_thread_args = (self.__video_streamer, stream_port, stream_host)
  353. self.__streamer_thread = threading.Thread(target=self.__streamer_main, args=streamer_thread_args).start()
  354. stream_root_name = self.__video_streamer.name
  355. self.__stream_url = f"http://localhost:{stream_port}{stream_root_name}"
  356. return self.__return_stream_info()
  357. @staticmethod
  358. def __streamer_main(video_streamer: VideoStreamer,
  359. stream_port: int = None, stream_host: str = "0.0.0.0"):
  360. video_streamer.run(host=stream_host, port=stream_port)
  361. def save_img(self):
  362. get_time: str = time.strftime("%H:%M:%S")
  363. current_file_path = os.path.realpath(__file__)
  364. current_dir = os.path.dirname(current_file_path)
  365. parent_dir = os.path.dirname(current_dir)
  366. file_path = os.path.join(parent_dir, "export", f"OSC.{get_time.replace(':', '.')}.png")
  367. if self.__screenshot_frame is None:
  368. cv2.imwrite(file_path, self.__img_empty)
  369. else:
  370. cv2.imwrite(file_path, self.__screenshot_frame)
  371. return get_time
  372. def __listening_main(self, realtime_terminal_output: bool, enable_saving_file: bool = True):
  373. video_recode_service = None
  374. height, width = 512, 512
  375. screenshot_frame_last = np.zeros((height, width, 3), dtype=np.uint8)
  376. screenshot_frame_last[:, :] = [255, 255, 255] # BGR
  377. if enable_saving_file is True:
  378. folder_path = os.path.join(self.__config.buffer_path, self.__instrument_name)
  379. os.makedirs(folder_path, exist_ok=True)
  380. img_path = os.path.join(folder_path, f"{self.__instrument_name}.avi")
  381. fourcc = cv2.VideoWriter_fourcc('I', '4', '2', '0')
  382. video_recode_service = cv2.VideoWriter(img_path, fourcc, 25, (640, 480))
  383. if realtime_terminal_output is True:
  384. cv2.namedWindow(self.__windows_name, cv2.WINDOW_NORMAL)
  385. while True:
  386. try:
  387. screenshot_frame = self.get_screenshot()
  388. except Exception as e:
  389. self.__pubtools.debug_output(e)
  390. screenshot_frame = screenshot_frame_last
  391. else:
  392. screenshot_frame_last = screenshot_frame
  393. self.__screenshot_frame = screenshot_frame
  394. self.__video_streamer.push(screenshot_frame)
  395. if (enable_saving_file is True) and (video_recode_service is not None):
  396. video_recode_service.write(screenshot_frame)
  397. if realtime_terminal_output is True:
  398. cv2.imshow(self.__windows_name, self.realtime_screenshot)
  399. cv2.waitKey(1)
  400. def get_screenshot(self) -> mat:
  401. self.scpi_instrument.connection.write(':DISP:DATA? JPG')
  402. raw_data = self.scpi_instrument.connection.read_raw()
  403. image_data = self.__parse_data(raw_data)
  404. img_raw = np.asarray(bytearray(image_data), dtype="uint8")
  405. img_raw = cv2.imdecode(img_raw, cv2.IMREAD_COLOR)
  406. return img_raw
  407. @staticmethod
  408. def __parse_data(raw_data: bytes) -> bytes:
  409. start_index = raw_data.find(b'#')
  410. if start_index == -1:
  411. raise ValueError("Invalid data: TMC header not found.")
  412. length_digits = int(raw_data[start_index + 1:start_index + 2])
  413. data_length = int(raw_data[start_index + 2:start_index + 2 + length_digits])
  414. data_start = start_index + 2 + length_digits
  415. data_end = data_start + data_length
  416. image_data = raw_data[data_start:data_end]
  417. return image_data
  418. class WaveformType(Enum):
  419. Sine = "SIN"
  420. Square = "SQU"
  421. Ramp = "RAMP"
  422. Pulse = "PULS"
  423. Noise = "NOIS"
  424. class WaveformTypeTools(object):
  425. @staticmethod
  426. def get_all_waveform_types() -> str:
  427. return " ".join([waveform_type.name for waveform_type in WaveformType])
  428. @staticmethod
  429. def match_waveform_type(waveform_name: str) -> WaveformType:
  430. try:
  431. return WaveformType[waveform_name.strip().capitalize()]
  432. except KeyError:
  433. return WaveformType.Sine
  434. class WaveformFreqUnit(Enum):
  435. MHz = "MHz"
  436. kHz = "kHz"
  437. Hz = "Hz"
  438. mHz = "mHz"
  439. uHz = "uHz"
  440. class WaveformLevelUnit(Enum):
  441. V = "Vpp"
  442. mV = "mVpp"
  443. class WaveformConfig(object):
  444. def __init__(self):
  445. self.type: WaveformType = WaveformType.Sine
  446. self.type_string: str = "SIN"
  447. self.freq: float = 100
  448. self.freq_unit: WaveformFreqUnit = WaveformFreqUnit.kHz
  449. self.high_level: float = 3
  450. self.high_level_unit: WaveformLevelUnit = WaveformLevelUnit.V
  451. self.low_level: float = 3
  452. self.low_level_unit: WaveformLevelUnit = WaveformLevelUnit.mV
  453. @staticmethod
  454. def get_type_string() -> str:
  455. return WaveformTypeTools.get_all_waveform_types()
  456. @staticmethod
  457. def match_type_string(type_string: str) -> WaveformType:
  458. return WaveformTypeTools.match_waveform_type(type_string)
  459. def get_freq_unit_type_string(self) -> str:
  460. return self.__get_type_value_string(WaveformFreqUnit)
  461. def match_freq_unit_type_string(self, freq_unit_string: str) -> WaveformFreqUnit:
  462. return self.__match_type_value_string(WaveformFreqUnit, WaveformFreqUnit.kHz, freq_unit_string)
  463. def get_level_unit_type_string(self) -> str:
  464. return self.__get_type_value_string(WaveformLevelUnit)
  465. def match_level_unit_type_string(self, level_unit_string: str) -> WaveformLevelUnit:
  466. return self.__match_type_value_string(WaveformLevelUnit, WaveformLevelUnit.V, level_unit_string)
  467. @staticmethod
  468. def __get_type_value_string(type_enum) -> str:
  469. type_value_string: str = ""
  470. for type_enum_item in type_enum:
  471. type_value_string = type_value_string + " " + type_enum_item.value()
  472. return type_value_string
  473. @staticmethod
  474. def __match_type_value_string(type_enum, type_default, type_value_string: str):
  475. for item in type_enum:
  476. if item.value == type_value_string:
  477. return item
  478. return type_default
  479. def to_str(self) -> str:
  480. config = copy.deepcopy(self)
  481. parts = [
  482. config.type.value.lower(),
  483. str(config.freq),
  484. config.freq_unit.value,
  485. str(config.high_level),
  486. config.high_level_unit.value.lower(),
  487. str(config.low_level),
  488. config.low_level_unit.value.lower()
  489. ]
  490. if parts[2] == "MHz":
  491. parts[2] = "LMHz"
  492. return "|".join(parts)
  493. def read_str(self, config_str: str):
  494. parts = config_str.split("|")
  495. self.type = WaveformType(parts[0].capitalize())
  496. self.freq = float(parts[1])
  497. if parts[2].lower() == "lmhz":
  498. self.freq_unit = WaveformFreqUnit.MHz
  499. else:
  500. self.freq_unit = WaveformFreqUnit(parts[2].capitalize())
  501. self.high_level = float(parts[3])
  502. self.high_level_unit = WaveformLevelUnit(parts[4].upper())
  503. self.low_level = float(parts[5])
  504. self.low_level_unit = WaveformLevelUnit(parts[6].upper())
  505. class WaveformGeneratorService(object):
  506. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  507. config: InstrumentControllerConfig):
  508. self.scpi_instrument: ScpiInstrument = scpi_instrument
  509. self.__pubtools: ProgramPublicTools = pubtools
  510. self.__config: InstrumentControllerConfig = config
  511. self.enable_channel_1: bool = True
  512. self.enable_channel_2: bool = True
  513. self.config_channel_1: WaveformConfig = WaveformConfig()
  514. self.config_channel_2: WaveformConfig = WaveformConfig()
  515. self.name: str = "Waveform Generator"
  516. def output_restart(self):
  517. self.output_pause()
  518. self.output_start()
  519. def output_start(self):
  520. if self.enable_channel_1:
  521. self.__output_start_channel(1)
  522. if self.enable_channel_2:
  523. self.__output_start_channel(2)
  524. def __output_start_channel(self, channel_number: int):
  525. cmd = f":OUTPut{channel_number} ON"
  526. self.scpi_instrument.connection.write(cmd)
  527. def output_pause(self):
  528. self.__output_pause_channel(1)
  529. self.__output_pause_channel(2)
  530. def __output_pause_channel(self, channel_number: int):
  531. cmd = f":OUTPut{channel_number} OFF"
  532. self.scpi_instrument.connection.write(cmd)
  533. def get_config(self, channel: str) -> tuple[int, WaveformConfig]:
  534. if channel == "2":
  535. return 2, self.config_channel_2
  536. else:
  537. return 1, self.config_channel_1
  538. def set_enable(self, channel: str, enable: bool) -> int:
  539. if channel == "2":
  540. self.enable_channel_2 = enable
  541. return 2
  542. else:
  543. self.enable_channel_1 = enable
  544. return 1
  545. def apply_config(self, immediate_start: bool = False):
  546. self.__apply_config_channel(1, self.enable_channel_1, self.config_channel_1, immediate_start)
  547. self.__apply_config_channel(2, self.enable_channel_2, self.config_channel_2, immediate_start)
  548. def __apply_config_channel(self, channel_number: int, channel_enable: bool, channel_config: WaveformConfig,
  549. immediate_start: bool = False):
  550. if channel_config.type_string != WaveformType.Noise.value:
  551. cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type_string} " +
  552. f"{channel_config.freq}{channel_config.freq_unit.value}," +
  553. f"{channel_config.high_level}{channel_config.high_level_unit.value}")
  554. else:
  555. cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type_string} " +
  556. f"{channel_config.high_level}{channel_config.high_level_unit.value}")
  557. self.scpi_instrument.connection.write(cmd)
  558. if (immediate_start is False) or (channel_enable is False):
  559. self.output_pause()
  560. else:
  561. self.output_start()
  562. class AnalogElectronicLoadService(object):
  563. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  564. config: InstrumentControllerConfig):
  565. self.scpi_instrument: ScpiInstrument = scpi_instrument
  566. self.__pubtools: ProgramPublicTools = pubtools
  567. self.__config: InstrumentControllerConfig = config
  568. self.name: str = "Analog Electronic Load"
  569. self.__mode_set_time: int = 0
  570. def set_mode(self):
  571. if self.__mode_set_time <= 2:
  572. self.__mode_set_time += 1
  573. cmd_set = f":SOUR:FUNC RES"
  574. self.scpi_instrument.connection.write(cmd_set)
  575. def set_resistance(self, value: str):
  576. self.set_mode()
  577. cmd_set = f":SOUR:RES:LEV:IMM {value}"
  578. self.scpi_instrument.connection.write(cmd_set)
  579. self.__set_open()
  580. def __set_open(self):
  581. self.scpi_instrument.connection.write(":SOUR:INP:STAT 1")
  582. def __set_close(self):
  583. self.scpi_instrument.connection.write(":SOUR:INP:STAT 0")
  584. def run_as_main():
  585. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  586. instrument_controller: InstrumentController = InstrumentController(program_pubtools)
  587. instrument_controller.auto_connect()
  588. # _, _, server_url_multimeter = instrument_controller.services.digital_multimeter.keep_listening()
  589. # print("digital_multimeter", server_url_multimeter)
  590. # _, _, server_url_oscilloscope = instrument_controller.services.digital_oscilloscope.keep_listening()
  591. # print("digital_oscilloscope", server_url_oscilloscope)
  592. while True:
  593. input_value = input("input: ")
  594. instrument_controller.services.analog_electronic_load.set_resistance(input_value)
  595. if __name__ == "__main__":
  596. run_as_main()