instrument_controller.py 29 KB


  1. import copy
  2. import os
  3. import random
  4. import threading
  5. import time
  6. import typing
  7. from collections import deque
  8. from enum import Enum
  9. from typing import Optional
  10. import cv2
  11. import numpy as np
  12. import pyvisa
  13. from numpy import ndarray as mat
  14. from pyvisa import ResourceManager as scpiManager
  15. from pyvisa.resources import MessageBasedResource as scpiConnection
  16. from flask import Flask, Response
  17. from flask import Flask, jsonify
  18. from flask_cors import CORS
  19. from program_public_tools import ProgramPublicTools
  20. class ScpiInstrument(object):
  21. connection: scpiConnection
  22. name: str
  23. response: str
  24. class InstrumentResponseMark(object):
  25. def __init__(self):
  26. self.digital_multimeter: str = "DM3068"
  27. self.digital_oscilloscope: str = "DHO1204"
  28. self.waveform_generator: str = "DG5072"
  29. self.analog_electronic_load: str = "DL3021"
  30. self.__check_env_value()
  31. def __check_env_value(self):
  32. config_digital_multimeter = self.__get_env("PLC_SIM_SERVER_DIGITAL_MULTIMETER")
  33. config_digital_oscilloscope = self.__get_env("PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE")
  34. config_waveform_generator = self.__get_env("PLC_SIM_SERVER_WAVEFORM_GENERATOR")
  35. config_analog_electronic_load = self.__get_env("PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD")
  36. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_MULTIMETER = {config_digital_multimeter}")
  37. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE = {config_digital_oscilloscope}")
  38. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_WAVEFORM_GENERATOR = {config_waveform_generator}")
  39. print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD = {config_analog_electronic_load}")
  40. if config_digital_multimeter is not None:
  41. self.digital_multimeter = config_digital_multimeter
  42. if config_digital_oscilloscope is not None:
  43. self.digital_oscilloscope = config_digital_oscilloscope
  44. if config_waveform_generator is not None:
  45. self.waveform_generator = config_waveform_generator
  46. if config_analog_electronic_load is not None:
  47. self.analog_electronic_load = config_analog_electronic_load
  48. @staticmethod
  49. def __get_env(name: str) -> Optional[str]:
  50. return os.environ.get(name, None)
  51. class InstrumentControllerConfig(object):
  52. connection_timeout_ms: int = 5000
  53. response_mark: InstrumentResponseMark = InstrumentResponseMark()
  54. buffer_path: str = "./temp/"
  55. class InstrumentServices(object):
  56. def __init__(self):
  57. self.digital_multimeter: Optional[DigitalMultimeterService] = None
  58. self.digital_oscilloscope: Optional[DigitalOscilloscopeService] = None
  59. self.waveform_generator: Optional[WaveformGeneratorService] = None
  60. self.analog_electronic_load: Optional[AnalogElectronicLoadService] = None
  61. def check(self) -> list[str]:
  62. not_connect_list: list[str] = []
  63. for attr_name, attr_value in vars(self).items():
  64. if attr_value is None:
  65. not_connect_list.append(attr_name)
  66. return not_connect_list
  67. class InstrumentController(object):
  68. def __init__(self, pubtools: ProgramPublicTools):
  69. self.__scpi_manager: scpiManager = self.__init_scpi_manager()
  70. self.config: InstrumentControllerConfig = InstrumentControllerConfig()
  71. self.__pubtools: ProgramPublicTools = pubtools
  72. self.__scpi_instrument_list: list[ScpiInstrument] = []
  73. self.services: InstrumentServices = InstrumentServices()
  74. self.__retry_times: int = 1
  75. @staticmethod
  76. def __init_scpi_manager() -> scpiManager:
  77. return pyvisa.ResourceManager()
  78. def auto_connect(self) -> tuple[list[ScpiInstrument], list[str]]:
  79. self.__auto_disconnect()
  80. self.__scpi_instrument_list = self.__scan_and_connect_scpi_all()
  81. for instrument in self.__scpi_instrument_list:
  82. if self.config.response_mark.digital_multimeter in instrument.response:
  83. self.services.digital_multimeter = DigitalMultimeterService(instrument, self.__pubtools, self.config)
  84. elif self.config.response_mark.digital_oscilloscope in instrument.response:
  85. self.services.digital_oscilloscope = DigitalOscilloscopeService(instrument, self.__pubtools,
  86. self.config)
  87. elif self.config.response_mark.waveform_generator in instrument.response:
  88. self.services.waveform_generator = WaveformGeneratorService(instrument, self.__pubtools, self.config)
  89. elif self.config.response_mark.analog_electronic_load in instrument.response:
  90. self.services.analog_electronic_load = AnalogElectronicLoadService(instrument, self.__pubtools,
  91. self.config)
  92. else:
  93. pass
  94. not_connect_list = self.services.check()
  95. if len(not_connect_list) == 0:
  96. self.__pubtools.debug_output("All SCPI instruments connect successfully.")
  97. else:
  98. for instrument_name in not_connect_list:
  99. self.__pubtools.debug_output(f'Error: Cannot connect to instrument "{instrument_name}".')
  100. return self.__scpi_instrument_list, not_connect_list
  101. def __scan_and_connect_scpi_all(self) -> list[ScpiInstrument]:
  102. self.__pubtools.debug_output("Start to connect instruments...")
  103. device_name_list: tuple[str] = self.__scpi_manager.list_resources()
  104. scpi_instrument_list: list[ScpiInstrument] = []
  105. connect_number: int = 0
  106. if len(device_name_list) != 0:
  107. for device_name in device_name_list:
  108. scpi_connection, scpi_response = self.__scpi_connect(device_name)
  109. if scpi_connection is not None:
  110. scpi_instrument: ScpiInstrument = ScpiInstrument()
  111. scpi_instrument.connection = scpi_connection
  112. scpi_instrument.name = device_name
  113. scpi_instrument.response = scpi_response
  114. scpi_instrument_list.append(scpi_instrument)
  115. connect_number += 1
  116. self.__pubtools.debug_output(f"Connect completed! Connected to [{connect_number}] instruments.")
  117. return scpi_instrument_list
  118. def __auto_disconnect(self) -> list[ScpiInstrument]:
  119. new_scpi_instrument_list: list[ScpiInstrument] = []
  120. if len(self.__scpi_instrument_list) != 0:
  121. for scpi_instrument in self.__scpi_instrument_list:
  122. scpi_instrument.connection.close()
  123. self.__scpi_instrument_list = new_scpi_instrument_list
  124. return new_scpi_instrument_list
  125. def __scpi_connect(self, device_name: str) -> tuple[Optional[scpiConnection], Optional[str]]:
  126. for _ in range(0, self.__retry_times):
  127. try:
  128. scpi_connection = typing.cast(scpiConnection, self.__scpi_manager.open_resource(device_name))
  129. scpi_connection.timeout = self.config.connection_timeout_ms
  130. scpi_response: str = scpi_connection.query('*IDN?')
  131. scpi_response = scpi_response.strip()
  132. self.__pubtools.debug_output(f'Connect to SCPI device "{device_name}".')
  133. self.__pubtools.debug_output(f'Device response: "{scpi_response}"')
  134. except Exception as e:
  135. self.__pubtools.debug_output(f'Error when attempting to connect scpi device "{device_name}".')
  136. self.__pubtools.debug_output(f'Error information: {e}')
  137. self.__pubtools.debug_output(f'Start to retry......')
  138. else:
  139. return scpi_connection, scpi_response
  140. self.__pubtools.debug_output(f'Retrying failed for 3 times. Give up connecting.')
  141. return None, None
  142. class FloatServer:
  143. def __init__(self, name: Optional[str] = None):
  144. self.name: Optional[str] = name
  145. self.__app = Flask(__name__)
  146. CORS(self.__app)
  147. self.__value = 0.0
  148. self.__setup_routes()
  149. def __setup_routes(self):
  150. if self.name is None:
  151. self.name = f"/float_default_{time.time()}"
  152. else:
  153. self.name = f"/{self.name}"
  154. @self.__app.route(self.name, methods=['GET'])
  155. def get_value():
  156. return jsonify({'value': self.__value})
  157. def run(self, host, port):
  158. self.__app.run(host=host, port=port)
  159. def push(self, new_value):
  160. self.__value = new_value
  161. class MultimeterValueRange(Enum):
  162. range_200mv = "0"
  163. range_2v = "1"
  164. range_20v = "2"
  165. range_200v = "3"
  166. range_1000v = "4"
  167. default = "3"
  168. class MultimeterValueRangeTools(object):
  169. @staticmethod
  170. def get_range_type() -> list[str]:
  171. type_list: list[str] = ["200mv", "2v", "20v", "200v", "1000v", "default"]
  172. return type_list
  173. @staticmethod
  174. def solve_range(rang_string) -> MultimeterValueRange:
  175. match rang_string:
  176. case "200mv":
  177. return MultimeterValueRange.range_200mv
  178. case "2v":
  179. return MultimeterValueRange.range_2v
  180. case "20v":
  181. return MultimeterValueRange.range_20v
  182. case "200v":
  183. return MultimeterValueRange.range_200v
  184. case "1000v":
  185. return MultimeterValueRange.range_1000v
  186. case "default":
  187. return MultimeterValueRange.default
  188. case _:
  189. return MultimeterValueRange.default
  190. class DigitalMultimeterService(object):
  191. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  192. config: InstrumentControllerConfig):
  193. self.scpi_instrument: ScpiInstrument = scpi_instrument
  194. self.__pubtools: ProgramPublicTools = pubtools
  195. self.__config: InstrumentControllerConfig = config
  196. self.__range: MultimeterValueRange = MultimeterValueRange.default
  197. self.realtime_value: float = float(0)
  198. self.__listening_thread: Optional[threading.Thread] = None
  199. self.__float_server_thread: Optional[threading.Thread] = None
  200. self.__whether_range_need_to_set: bool = True
  201. self.__error_count: int = 0
  202. self.name: str = "Digital Multimeter"
  203. self.__instrument_name: str = "multimeter"
  204. self.__float_server: FloatServer = FloatServer(self.__instrument_name)
  205. self.__is_listening: bool = False
  206. self.__server_url: str = ''
  207. def set_range(self, value_range: MultimeterValueRange):
  208. self.__range = value_range
  209. @staticmethod
  210. def get_range() -> list[str]:
  211. return MultimeterValueRangeTools.get_range_type()
  212. @staticmethod
  213. def solve_range_string(range_string: str) -> MultimeterValueRange:
  214. return MultimeterValueRangeTools.solve_range(range_string)
  215. def __float_server_main(self, float_server: FloatServer, server_host, server_port):
  216. self.__float_server.run(host=server_host, port=server_port)
  217. def keep_listening(self, realtime_terminal_output: bool = False,
  218. server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
  219. if self.__is_listening is False:
  220. self.__start_server_listening(realtime_terminal_output, server_port, server_host)
  221. self.__is_listening = True
  222. return self.__return_server_info()
  223. def __start_server_listening(self, realtime_terminal_output: bool = False,
  224. server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
  225. if server_port is None:
  226. server_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8600)[0]
  227. listening_thread_args = (realtime_terminal_output,)
  228. self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
  229. float_server_args = (self.__float_server, server_host, server_port)
  230. self.__float_server_thread = threading.Thread(target=self.__float_server_main, args=float_server_args).start()
  231. server_root_name = self.__float_server.name
  232. self.__server_url = f"http://localhost:{server_port}{server_root_name}"
  233. return self.__return_server_info()
  234. def __return_server_info(self):
  235. return self.__listening_thread, self.__float_server_thread, self.__server_url
  236. def __listening_main(self, realtime_terminal_output: bool):
  237. threading.Thread(target=self.__listening_auto_clear)
  238. while True:
  239. self.realtime_value = self.__get_value()
  240. self.__float_server.push(self.realtime_value)
  241. if realtime_terminal_output is True:
  242. self.__pubtools.debug_output(f"Listening voltage from digital multimeter: {self.realtime_value}.")
  243. def __listening_auto_clear(self):
  244. while True:
  245. self.__whether_range_need_to_set = True
  246. time.sleep(5)
  247. def get_value_latest(self) -> float:
  248. return self.realtime_value
  249. def __get_value(self, use_range: Optional[MultimeterValueRange] = None):
  250. if use_range is None:
  251. use_range = self.__range
  252. else:
  253. self.__whether_range_need_to_set = True
  254. if (self.__whether_range_need_to_set is True) and (self.__error_count <= 3):
  255. try:
  256. cmd_set = f":MEASure:VOLTage:DC {use_range.value}"
  257. self.scpi_instrument.connection.query(cmd_set)
  258. except Exception as e:
  259. self.__pubtools.debug_output("Exception found when trying to set range of digital multimeter.")
  260. self.__pubtools.debug_output(f"Error info: {e}")
  261. self.__error_count += 1
  262. else:
  263. self.__whether_range_need_to_set = False
  264. self.__error_count = 0
  265. cmd_query = ":MEASure:VOLTage:DC?"
  266. multimeter_value = self.scpi_instrument.connection.query(cmd_query)
  267. return float(multimeter_value)
  268. class VideoStreamer:
  269. def __init__(self, buffer_size: int = 10, name: Optional[str] = None):
  270. self.name: Optional[str] = name
  271. self.__app = Flask(__name__)
  272. self.__frame_queue = deque(maxlen=buffer_size)
  273. self.__lock = threading.Lock() # 用于确保线程安全
  274. self.__setup_routes()
  275. def push(self, frame):
  276. """
  277. 推送一个帧到流。
  278. """
  279. with self.__lock:
  280. self.__frame_queue.append(frame)
  281. def __generate(self):
  282. while True:
  283. if len(self.__frame_queue) == 0:
  284. time.sleep(0.05) # 若队列为空,稍微等待
  285. continue
  286. with self.__lock:
  287. frame = self.__frame_queue.popleft() # 从队列左侧获取帧
  288. ret, jpeg = cv2.imencode('.jpg', frame)
  289. if ret:
  290. yield (b'--frame\r\n'
  291. b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
  292. def __video_feed(self):
  293. return Response(self.__generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
  294. def __setup_routes(self):
  295. if self.name is None:
  296. self.name = f"/video_default_{time.time()}"
  297. else:
  298. self.name = f"/{self.name}"
  299. self.__app.add_url_rule(self.name, 'video_feed', self.__video_feed)
  300. def run(self, host, port):
  301. self.__app.run(threaded=True, host=host, port=port)
  302. return self.name
  303. class DigitalOscilloscopeService(object):
  304. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  305. config: InstrumentControllerConfig):
  306. self.scpi_instrument: ScpiInstrument = scpi_instrument
  307. self.__pubtools: ProgramPublicTools = pubtools
  308. self.__config: InstrumentControllerConfig = config
  309. self.__instrument_name: str = "oscilloscope"
  310. self.realtime_screenshot: Optional[mat] = None
  311. self.__listening_thread: Optional[threading.Thread] = None
  312. self.__streamer_thread: Optional[threading.Thread] = None
  313. self.__windows_name: str = "Realtime Screenshot From Digital Oscilloscope"
  314. self.__video_streamer: VideoStreamer = VideoStreamer(name="oscilloscope")
  315. self.name: str = "Digital Oscilloscope"
  316. self.__is_listening: bool = False
  317. self.__stream_port: Optional[int] = None
  318. self.__stream_url: Optional[str] = None
  319. self.__screenshot_frame = None
  320. self.__img_empty = np.zeros((500, 500, 3), dtype=np.uint8)
  321. self.__img_empty[:, :] = [255, 255, 255] # BGR
  322. def keep_listening(self, realtime_terminal_output: bool = False,
  323. stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
  324. if self.__is_listening is False:
  325. self.__start_listening_server(realtime_terminal_output, stream_port, stream_host)
  326. self.__is_listening = True
  327. return self.__return_stream_info()
  328. def __return_stream_info(self):
  329. return self.__listening_thread, self.__stream_port, self.__stream_url
  330. def __start_listening_server(self, realtime_terminal_output: bool = False,
  331. stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
  332. if stream_port is None:
  333. stream_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=9000)[0]
  334. self.__stream_port = stream_port
  335. listening_thread_args = (realtime_terminal_output, True)
  336. self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
  337. streamer_thread_args = (self.__video_streamer, stream_port, stream_host)
  338. self.__streamer_thread = threading.Thread(target=self.__streamer_main, args=streamer_thread_args).start()
  339. stream_root_name = self.__video_streamer.name
  340. self.__stream_url = f"http://localhost:{stream_port}{stream_root_name}"
  341. return self.__return_stream_info()
  342. @staticmethod
  343. def __streamer_main(video_streamer: VideoStreamer,
  344. stream_port: int = None, stream_host: str = "0.0.0.0"):
  345. video_streamer.run(host=stream_host, port=stream_port)
  346. def save_img(self):
  347. get_time: str = time.strftime("%H:%M:%S")
  348. current_file_path = os.path.realpath(__file__)
  349. current_dir = os.path.dirname(current_file_path)
  350. parent_dir = os.path.dirname(current_dir)
  351. file_path = os.path.join(parent_dir, "export", f"OSC.{get_time.replace(':', '.')}.png")
  352. if self.__screenshot_frame is None:
  353. cv2.imwrite(file_path, self.__img_empty)
  354. else:
  355. cv2.imwrite(file_path, self.__screenshot_frame)
  356. return get_time
  357. def __listening_main(self, realtime_terminal_output: bool, enable_saving_file: bool = True):
  358. video_recode_service = None
  359. height, width = 512, 512
  360. screenshot_frame_last = np.zeros((height, width, 3), dtype=np.uint8)
  361. screenshot_frame_last[:, :] = [255, 255, 255] # BGR
  362. if enable_saving_file is True:
  363. folder_path = os.path.join(self.__config.buffer_path, self.__instrument_name)
  364. os.makedirs(folder_path, exist_ok=True)
  365. img_path = os.path.join(folder_path, f"{self.__instrument_name}.avi")
  366. fourcc = cv2.VideoWriter_fourcc('I', '4', '2', '0')
  367. video_recode_service = cv2.VideoWriter(img_path, fourcc, 25, (640, 480))
  368. if realtime_terminal_output is True:
  369. cv2.namedWindow(self.__windows_name, cv2.WINDOW_NORMAL)
  370. while True:
  371. try:
  372. screenshot_frame = self.get_screenshot()
  373. except Exception as e:
  374. self.__pubtools.debug_output(e)
  375. screenshot_frame = screenshot_frame_last
  376. else:
  377. screenshot_frame_last = screenshot_frame
  378. self.__screenshot_frame = screenshot_frame
  379. self.__video_streamer.push(screenshot_frame)
  380. if (enable_saving_file is True) and (video_recode_service is not None):
  381. video_recode_service.write(screenshot_frame)
  382. if realtime_terminal_output is True:
  383. cv2.imshow(self.__windows_name, self.realtime_screenshot)
  384. cv2.waitKey(1)
  385. def get_screenshot(self) -> mat:
  386. self.scpi_instrument.connection.write(':DISP:DATA? JPG')
  387. raw_data = self.scpi_instrument.connection.read_raw()
  388. image_data = self.__parse_data(raw_data)
  389. img_raw = np.asarray(bytearray(image_data), dtype="uint8")
  390. img_raw = cv2.imdecode(img_raw, cv2.IMREAD_COLOR)
  391. return img_raw
  392. @staticmethod
  393. def __parse_data(raw_data: bytes) -> bytes:
  394. start_index = raw_data.find(b'#')
  395. if start_index == -1:
  396. raise ValueError("Invalid data: TMC header not found.")
  397. length_digits = int(raw_data[start_index + 1:start_index + 2])
  398. data_length = int(raw_data[start_index + 2:start_index + 2 + length_digits])
  399. data_start = start_index + 2 + length_digits
  400. data_end = data_start + data_length
  401. image_data = raw_data[data_start:data_end]
  402. return image_data
  403. class WaveformType(Enum):
  404. Sine = "SIN"
  405. Square = "SQU"
  406. Ramp = "RAMP"
  407. Pulse = "PULS"
  408. Noise = "NOIS"
  409. class WaveformTypeTools(object):
  410. @staticmethod
  411. def get_all_waveform_types() -> str:
  412. return " ".join([waveform_type.name for waveform_type in WaveformType])
  413. @staticmethod
  414. def match_waveform_type(waveform_name: str) -> WaveformType:
  415. try:
  416. return WaveformType[waveform_name.strip().capitalize()]
  417. except KeyError:
  418. return WaveformType.Sine
  419. class WaveformFreqUnit(Enum):
  420. MHz = "MHz"
  421. kHz = "kHz"
  422. Hz = "Hz"
  423. mHz = "mHz"
  424. uHz = "uHz"
  425. class WaveformLevelUnit(Enum):
  426. V = "Vpp"
  427. mV = "mVpp"
  428. class WaveformConfig(object):
  429. def __init__(self):
  430. self.type: WaveformType = WaveformType.Sine
  431. self.freq: float = 100
  432. self.freq_unit: WaveformFreqUnit = WaveformFreqUnit.kHz
  433. self.high_level: float = 3
  434. self.high_level_unit: WaveformLevelUnit = WaveformLevelUnit.V
  435. self.low_level: float = 3
  436. self.low_level_unit: WaveformLevelUnit = WaveformLevelUnit.mV
  437. @staticmethod
  438. def get_type_string() -> str:
  439. return WaveformTypeTools.get_all_waveform_types()
  440. @staticmethod
  441. def match_type_string(type_string: str) -> WaveformType:
  442. return WaveformTypeTools.match_waveform_type(type_string)
  443. def get_freq_unit_type_string(self) -> str:
  444. return self.__get_type_value_string(WaveformFreqUnit)
  445. def match_freq_unit_type_string(self, freq_unit_string: str) -> WaveformFreqUnit:
  446. return self.__match_type_value_string(WaveformFreqUnit, WaveformFreqUnit.kHz, freq_unit_string)
  447. def get_level_unit_type_string(self) -> str:
  448. return self.__get_type_value_string(WaveformLevelUnit)
  449. def match_level_unit_type_string(self, level_unit_string: str) -> WaveformLevelUnit:
  450. return self.__match_type_value_string(WaveformLevelUnit, WaveformLevelUnit.V, level_unit_string)
  451. @staticmethod
  452. def __get_type_value_string(type_enum) -> str:
  453. type_value_string: str = ""
  454. for type_enum_item in type_enum:
  455. type_value_string = type_value_string + " " + type_enum_item.value()
  456. return type_value_string
  457. @staticmethod
  458. def __match_type_value_string(type_enum, type_default, type_value_string: str):
  459. for item in type_enum:
  460. if item.value == type_value_string:
  461. return item
  462. return type_default
  463. def to_str(self) -> str:
  464. config = copy.deepcopy(self)
  465. parts = [
  466. config.type.value.lower(),
  467. str(config.freq),
  468. config.freq_unit.value,
  469. str(config.high_level),
  470. config.high_level_unit.value.lower(),
  471. str(config.low_level),
  472. config.low_level_unit.value.lower()
  473. ]
  474. if parts[2] == "MHz":
  475. parts[2] = "LMHz"
  476. return "|".join(parts)
  477. def read_str(self, config_str: str):
  478. parts = config_str.split("|")
  479. self.type = WaveformType(parts[0].capitalize())
  480. self.freq = float(parts[1])
  481. if parts[2].lower() == "lmhz":
  482. self.freq_unit = WaveformFreqUnit.MHz
  483. else:
  484. self.freq_unit = WaveformFreqUnit(parts[2].capitalize())
  485. self.high_level = float(parts[3])
  486. self.high_level_unit = WaveformLevelUnit(parts[4].upper())
  487. self.low_level = float(parts[5])
  488. self.low_level_unit = WaveformLevelUnit(parts[6].upper())
  489. class WaveformGeneratorService(object):
  490. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  491. config: InstrumentControllerConfig):
  492. self.scpi_instrument: ScpiInstrument = scpi_instrument
  493. self.__pubtools: ProgramPublicTools = pubtools
  494. self.__config: InstrumentControllerConfig = config
  495. self.enable_channel_1: bool = True
  496. self.enable_channel_2: bool = True
  497. self.config_channel_1: WaveformConfig = WaveformConfig()
  498. self.config_channel_2: WaveformConfig = WaveformConfig()
  499. self.name: str = "Waveform Generator"
  500. def output_restart(self):
  501. self.output_pause()
  502. self.output_start()
  503. def output_start(self):
  504. if self.enable_channel_1:
  505. self.__output_start_channel(1)
  506. if self.enable_channel_2:
  507. self.__output_start_channel(2)
  508. def __output_start_channel(self, channel_number: int):
  509. cmd = f":OUTPut{channel_number} ON"
  510. self.scpi_instrument.connection.write(cmd)
  511. def output_pause(self):
  512. self.__output_pause_channel(1)
  513. self.__output_pause_channel(2)
  514. def __output_pause_channel(self, channel_number: int):
  515. cmd = f":OUTPut{channel_number} OFF"
  516. self.scpi_instrument.connection.write(cmd)
  517. def get_config(self, channel: str) -> tuple[int, WaveformConfig]:
  518. if channel == "2":
  519. return 2, self.config_channel_2
  520. else:
  521. return 1, self.config_channel_1
  522. def set_enable(self, channel: str, enable: bool) -> int:
  523. if channel == "2":
  524. self.enable_channel_2 = enable
  525. return 2
  526. else:
  527. self.enable_channel_1 = enable
  528. return 1
  529. def apply_config(self, immediate_start: bool = False):
  530. self.__apply_config_channel(1, self.enable_channel_1, self.config_channel_1, immediate_start)
  531. self.__apply_config_channel(2, self.enable_channel_2, self.config_channel_2, immediate_start)
  532. def __apply_config_channel(self, channel_number: int, channel_enable: bool, channel_config: WaveformConfig,
  533. immediate_start: bool = False):
  534. if channel_config.type != WaveformType.Noise:
  535. cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type.value} " +
  536. f"{channel_config.freq}{channel_config.freq_unit.value}," +
  537. f"{channel_config.high_level}{channel_config.high_level_unit.value}")
  538. else:
  539. cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type.value} " +
  540. f"{channel_config.high_level}{channel_config.high_level_unit.value}")
  541. self.scpi_instrument.connection.write(cmd)
  542. if (immediate_start is False) or (channel_enable is False):
  543. self.output_pause()
  544. else:
  545. self.output_start()
  546. class AnalogElectronicLoadService(object):
  547. def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
  548. config: InstrumentControllerConfig):
  549. self.scpi_instrument: ScpiInstrument = scpi_instrument
  550. self.__pubtools: ProgramPublicTools = pubtools
  551. self.__config: InstrumentControllerConfig = config
  552. self.name: str = "Analog Electronic Load"
  553. self.__mode_set_time: int = 0
  554. def set_mode(self):
  555. if self.__mode_set_time <= 2:
  556. self.__mode_set_time += 1
  557. cmd_set = f":SOUR:FUNC RES"
  558. self.scpi_instrument.connection.write(cmd_set)
  559. def set_resistance(self, value: str):
  560. self.set_mode()
  561. cmd_set = f":SOUR:RES:LEV:IMM {value}"
  562. self.scpi_instrument.connection.write(cmd_set)
  563. def run_as_main():
  564. program_pubtools: ProgramPublicTools = ProgramPublicTools()
  565. instrument_controller: InstrumentController = InstrumentController(program_pubtools)
  566. instrument_controller.auto_connect()
  567. # _, _, server_url_multimeter = instrument_controller.services.digital_multimeter.keep_listening()
  568. # print("digital_multimeter", server_url_multimeter)
  569. # _, _, server_url_oscilloscope = instrument_controller.services.digital_oscilloscope.keep_listening()
  570. # print("digital_oscilloscope", server_url_oscilloscope)
  571. while True:
  572. input_value = input("input: ")
  573. instrument_controller.services.analog_electronic_load.set_resistance(input_value)
  574. if __name__ == "__main__":
  575. run_as_main()