123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676 |
- import copy
- import os
- import random
- import threading
- import time
- import typing
- from collections import deque
- from enum import Enum
- from typing import Optional
- import cv2
- import numpy as np
- import pyvisa
- from numpy import ndarray as mat
- from pyvisa import ResourceManager as scpiManager
- from pyvisa.resources import MessageBasedResource as scpiConnection
- from flask import Flask, Response
- from flask import Flask, jsonify
- from flask_cors import CORS
- from program_public_tools import ProgramPublicTools
- class ScpiInstrument(object):
- connection: scpiConnection
- name: str
- response: str
- class InstrumentResponseMark(object):
- def __init__(self):
- self.digital_multimeter: str = "DM3068"
- self.digital_oscilloscope: str = "DHO1204"
- self.waveform_generator: str = "DG5072"
- self.analog_electronic_load: str = "DL3021"
- self.__check_env_value()
- def __check_env_value(self):
- config_digital_multimeter = self.__get_env("PLC_SIM_SERVER_DIGITAL_MULTIMETER")
- config_digital_oscilloscope = self.__get_env("PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE")
- config_waveform_generator = self.__get_env("PLC_SIM_SERVER_WAVEFORM_GENERATOR")
- config_analog_electronic_load = self.__get_env("PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD")
- print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_MULTIMETER = {config_digital_multimeter}")
- print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE = {config_digital_oscilloscope}")
- print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_WAVEFORM_GENERATOR = {config_waveform_generator}")
- print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD = {config_analog_electronic_load}")
- if config_digital_multimeter is not None:
- self.digital_multimeter = config_digital_multimeter
- if config_digital_oscilloscope is not None:
- self.digital_oscilloscope = config_digital_oscilloscope
- if config_waveform_generator is not None:
- self.waveform_generator = config_waveform_generator
- if config_analog_electronic_load is not None:
- self.analog_electronic_load = config_analog_electronic_load
- @staticmethod
- def __get_env(name: str) -> Optional[str]:
- return os.environ.get(name, None)
- class InstrumentControllerConfig(object):
- connection_timeout_ms: int = 5000
- response_mark: InstrumentResponseMark = InstrumentResponseMark()
- buffer_path: str = "./temp/"
- class InstrumentServices(object):
- def __init__(self):
- self.digital_multimeter: Optional[DigitalMultimeterService] = None
- self.digital_oscilloscope: Optional[DigitalOscilloscopeService] = None
- self.waveform_generator: Optional[WaveformGeneratorService] = None
- self.analog_electronic_load: Optional[AnalogElectronicLoadService] = None
- def check(self) -> list[str]:
- not_connect_list: list[str] = []
- for attr_name, attr_value in vars(self).items():
- if attr_value is None:
- not_connect_list.append(attr_name)
- return not_connect_list
- class InstrumentController(object):
- def __init__(self, pubtools: ProgramPublicTools):
- self.__scpi_manager: scpiManager = self.__init_scpi_manager()
- self.config: InstrumentControllerConfig = InstrumentControllerConfig()
- self.__pubtools: ProgramPublicTools = pubtools
- self.__scpi_instrument_list: list[ScpiInstrument] = []
- self.services: InstrumentServices = InstrumentServices()
- self.__retry_times: int = 1
- @staticmethod
- def __init_scpi_manager() -> scpiManager:
- return pyvisa.ResourceManager()
- def auto_connect(self) -> tuple[list[ScpiInstrument], list[str]]:
- self.__auto_disconnect()
- self.__scpi_instrument_list = self.__scan_and_connect_scpi_all()
- for instrument in self.__scpi_instrument_list:
- if self.config.response_mark.digital_multimeter in instrument.response:
- self.services.digital_multimeter = DigitalMultimeterService(instrument, self.__pubtools, self.config)
- elif self.config.response_mark.digital_oscilloscope in instrument.response:
- self.services.digital_oscilloscope = DigitalOscilloscopeService(instrument, self.__pubtools,
- self.config)
- elif self.config.response_mark.waveform_generator in instrument.response:
- self.services.waveform_generator = WaveformGeneratorService(instrument, self.__pubtools, self.config)
- elif self.config.response_mark.analog_electronic_load in instrument.response:
- self.services.analog_electronic_load = AnalogElectronicLoadService(instrument, self.__pubtools,
- self.config)
- else:
- pass
- not_connect_list = self.services.check()
- if len(not_connect_list) == 0:
- self.__pubtools.debug_output("All SCPI instruments connect successfully.")
- else:
- for instrument_name in not_connect_list:
- self.__pubtools.debug_output(f'Error: Cannot connect to instrument "{instrument_name}".')
- return self.__scpi_instrument_list, not_connect_list
- def __scan_and_connect_scpi_all(self) -> list[ScpiInstrument]:
- self.__pubtools.debug_output("Start to connect instruments...")
- device_name_list: tuple[str] = self.__scpi_manager.list_resources()
- scpi_instrument_list: list[ScpiInstrument] = []
- connect_number: int = 0
- if len(device_name_list) != 0:
- for device_name in device_name_list:
- scpi_connection, scpi_response = self.__scpi_connect(device_name)
- if scpi_connection is not None:
- scpi_instrument: ScpiInstrument = ScpiInstrument()
- scpi_instrument.connection = scpi_connection
- scpi_instrument.name = device_name
- scpi_instrument.response = scpi_response
- scpi_instrument_list.append(scpi_instrument)
- connect_number += 1
- self.__pubtools.debug_output(f"Connect completed! Connected to [{connect_number}] instruments.")
- return scpi_instrument_list
- def __auto_disconnect(self) -> list[ScpiInstrument]:
- new_scpi_instrument_list: list[ScpiInstrument] = []
- if len(self.__scpi_instrument_list) != 0:
- for scpi_instrument in self.__scpi_instrument_list:
- scpi_instrument.connection.close()
- self.__scpi_instrument_list = new_scpi_instrument_list
- return new_scpi_instrument_list
- def __scpi_connect(self, device_name: str) -> tuple[Optional[scpiConnection], Optional[str]]:
- for _ in range(0, self.__retry_times):
- try:
- scpi_connection = typing.cast(scpiConnection, self.__scpi_manager.open_resource(device_name))
- scpi_connection.timeout = self.config.connection_timeout_ms
- scpi_response: str = scpi_connection.query('*IDN?')
- scpi_response = scpi_response.strip()
- self.__pubtools.debug_output(f'Connect to SCPI device "{device_name}".')
- self.__pubtools.debug_output(f'Device response: "{scpi_response}"')
- except Exception as e:
- self.__pubtools.debug_output(f'Error when attempting to connect scpi device "{device_name}".')
- self.__pubtools.debug_output(f'Error information: {e}')
- self.__pubtools.debug_output(f'Start to retry......')
- else:
- return scpi_connection, scpi_response
- self.__pubtools.debug_output(f'Retrying failed for 3 times. Give up connecting.')
- return None, None
- class FloatServer:
- def __init__(self, name: Optional[str] = None):
- self.name: Optional[str] = name
- self.__app = Flask(__name__)
- CORS(self.__app)
- self.__value = 0.0
- self.__setup_routes()
- def __setup_routes(self):
- if self.name is None:
- self.name = f"/float_default_{time.time()}"
- else:
- self.name = f"/{self.name}"
- @self.__app.route(self.name, methods=['GET'])
- def get_value():
- return jsonify({'value': self.__value})
- def run(self, host, port):
- self.__app.run(host=host, port=port)
- def push(self, new_value):
- self.__value = new_value
- class MultimeterValueRange(Enum):
- range_200mv = "0"
- range_2v = "1"
- range_20v = "2"
- range_200v = "3"
- range_1000v = "4"
- default = "3"
- class MultimeterValueRangeTools(object):
- @staticmethod
- def get_range_type() -> list[str]:
- type_list: list[str] = ["200mv", "2v", "20v", "200v", "1000v", "default"]
- return type_list
- @staticmethod
- def solve_range(rang_string) -> MultimeterValueRange:
- match rang_string:
- case "200mv":
- return MultimeterValueRange.range_200mv
- case "2v":
- return MultimeterValueRange.range_2v
- case "20v":
- return MultimeterValueRange.range_20v
- case "200v":
- return MultimeterValueRange.range_200v
- case "1000v":
- return MultimeterValueRange.range_1000v
- case "default":
- return MultimeterValueRange.default
- case _:
- return MultimeterValueRange.default
- class DigitalMultimeterService(object):
- def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
- config: InstrumentControllerConfig):
- self.scpi_instrument: ScpiInstrument = scpi_instrument
- self.__pubtools: ProgramPublicTools = pubtools
- self.__config: InstrumentControllerConfig = config
- self.__range: MultimeterValueRange = MultimeterValueRange.default
- self.realtime_value: float = float(0)
- self.__listening_thread: Optional[threading.Thread] = None
- self.__float_server_thread: Optional[threading.Thread] = None
- self.__whether_range_need_to_set: bool = True
- self.__error_count: int = 0
- self.name: str = "Digital Multimeter"
- self.__instrument_name: str = "multimeter"
- self.__float_server: FloatServer = FloatServer(self.__instrument_name)
- self.__is_listening: bool = False
- self.__server_url: str = ''
- def set_range(self, value_range: MultimeterValueRange):
- self.__range = value_range
- @staticmethod
- def get_range() -> list[str]:
- return MultimeterValueRangeTools.get_range_type()
- @staticmethod
- def solve_range_string(range_string: str) -> MultimeterValueRange:
- return MultimeterValueRangeTools.solve_range(range_string)
- def __float_server_main(self, float_server: FloatServer, server_host, server_port):
- self.__float_server.run(host=server_host, port=server_port)
- def keep_listening(self, realtime_terminal_output: bool = False,
- server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
- if self.__is_listening is False:
- self.__start_server_listening(realtime_terminal_output, server_port, server_host)
- self.__is_listening = True
- return self.__return_server_info()
- def __start_server_listening(self, realtime_terminal_output: bool = False,
- server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"):
- if server_port is None:
- server_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8600)[0]
- listening_thread_args = (realtime_terminal_output,)
- self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
- float_server_args = (self.__float_server, server_host, server_port)
- self.__float_server_thread = threading.Thread(target=self.__float_server_main, args=float_server_args).start()
- server_root_name = self.__float_server.name
- self.__server_url = f"http://localhost:{server_port}{server_root_name}"
- return self.__return_server_info()
- def __return_server_info(self):
- return self.__listening_thread, self.__float_server_thread, self.__server_url
- def __listening_main(self, realtime_terminal_output: bool):
- threading.Thread(target=self.__listening_auto_clear)
- while True:
- self.realtime_value = self.__get_value()
- self.__float_server.push(self.realtime_value)
- if realtime_terminal_output is True:
- self.__pubtools.debug_output(f"Listening voltage from digital multimeter: {self.realtime_value}.")
- def __listening_auto_clear(self):
- while True:
- self.__whether_range_need_to_set = True
- time.sleep(5)
- def get_value_latest(self) -> float:
- return self.realtime_value
- def __get_value(self, use_range: Optional[MultimeterValueRange] = None):
- if use_range is None:
- use_range = self.__range
- else:
- self.__whether_range_need_to_set = True
- if (self.__whether_range_need_to_set is True) and (self.__error_count <= 3):
- try:
- cmd_set = f":MEASure:VOLTage:DC {use_range.value}"
- self.scpi_instrument.connection.query(cmd_set)
- except Exception as e:
- self.__pubtools.debug_output("Exception found when trying to set range of digital multimeter.")
- self.__pubtools.debug_output(f"Error info: {e}")
- self.__error_count += 1
- else:
- self.__whether_range_need_to_set = False
- self.__error_count = 0
- cmd_query = ":MEASure:VOLTage:DC?"
- multimeter_value = self.scpi_instrument.connection.query(cmd_query)
- return float(multimeter_value)
- class VideoStreamer:
- def __init__(self, buffer_size: int = 10, name: Optional[str] = None):
- self.name: Optional[str] = name
- self.__app = Flask(__name__)
- self.__frame_queue = deque(maxlen=buffer_size)
- self.__lock = threading.Lock() # 用于确保线程安全
- self.__setup_routes()
- def push(self, frame):
- """
- 推送一个帧到流。
- """
- with self.__lock:
- self.__frame_queue.append(frame)
- def __generate(self):
- while True:
- if len(self.__frame_queue) == 0:
- time.sleep(0.05) # 若队列为空,稍微等待
- continue
- with self.__lock:
- frame = self.__frame_queue.popleft() # 从队列左侧获取帧
- ret, jpeg = cv2.imencode('.jpg', frame)
- if ret:
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
- def __video_feed(self):
- return Response(self.__generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
- def __setup_routes(self):
- if self.name is None:
- self.name = f"/video_default_{time.time()}"
- else:
- self.name = f"/{self.name}"
- self.__app.add_url_rule(self.name, 'video_feed', self.__video_feed)
- def run(self, host, port):
- self.__app.run(threaded=True, host=host, port=port)
- return self.name
- class DigitalOscilloscopeService(object):
- def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
- config: InstrumentControllerConfig):
- self.scpi_instrument: ScpiInstrument = scpi_instrument
- self.__pubtools: ProgramPublicTools = pubtools
- self.__config: InstrumentControllerConfig = config
- self.__instrument_name: str = "oscilloscope"
- self.realtime_screenshot: Optional[mat] = None
- self.__listening_thread: Optional[threading.Thread] = None
- self.__streamer_thread: Optional[threading.Thread] = None
- self.__windows_name: str = "Realtime Screenshot From Digital Oscilloscope"
- self.__video_streamer: VideoStreamer = VideoStreamer(name="oscilloscope")
- self.name: str = "Digital Oscilloscope"
- self.__is_listening: bool = False
- self.__stream_port: Optional[int] = None
- self.__stream_url: Optional[str] = None
- self.__screenshot_frame = None
- self.__img_empty = np.zeros((500, 500, 3), dtype=np.uint8)
- self.__img_empty[:, :] = [255, 255, 255] # BGR
- def keep_listening(self, realtime_terminal_output: bool = False,
- stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
- if self.__is_listening is False:
- self.__start_listening_server(realtime_terminal_output, stream_port, stream_host)
- self.__is_listening = True
- return self.__return_stream_info()
- def __return_stream_info(self):
- return self.__listening_thread, self.__stream_port, self.__stream_url
- def __start_listening_server(self, realtime_terminal_output: bool = False,
- stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"):
- if stream_port is None:
- stream_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=9000)[0]
- self.__stream_port = stream_port
- listening_thread_args = (realtime_terminal_output, True)
- self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start()
- streamer_thread_args = (self.__video_streamer, stream_port, stream_host)
- self.__streamer_thread = threading.Thread(target=self.__streamer_main, args=streamer_thread_args).start()
- stream_root_name = self.__video_streamer.name
- self.__stream_url = f"http://localhost:{stream_port}{stream_root_name}"
- return self.__return_stream_info()
- @staticmethod
- def __streamer_main(video_streamer: VideoStreamer,
- stream_port: int = None, stream_host: str = "0.0.0.0"):
- video_streamer.run(host=stream_host, port=stream_port)
- def save_img(self):
- get_time: str = time.strftime("%H:%M:%S")
- current_file_path = os.path.realpath(__file__)
- current_dir = os.path.dirname(current_file_path)
- parent_dir = os.path.dirname(current_dir)
- file_path = os.path.join(parent_dir, "export", f"OSC.{get_time.replace(':', '.')}.png")
- if self.__screenshot_frame is None:
- cv2.imwrite(file_path, self.__img_empty)
- else:
- cv2.imwrite(file_path, self.__screenshot_frame)
- return get_time
- def __listening_main(self, realtime_terminal_output: bool, enable_saving_file: bool = True):
- video_recode_service = None
- height, width = 512, 512
- screenshot_frame_last = np.zeros((height, width, 3), dtype=np.uint8)
- screenshot_frame_last[:, :] = [255, 255, 255] # BGR
- if enable_saving_file is True:
- folder_path = os.path.join(self.__config.buffer_path, self.__instrument_name)
- os.makedirs(folder_path, exist_ok=True)
- img_path = os.path.join(folder_path, f"{self.__instrument_name}.avi")
- fourcc = cv2.VideoWriter_fourcc('I', '4', '2', '0')
- video_recode_service = cv2.VideoWriter(img_path, fourcc, 25, (640, 480))
- if realtime_terminal_output is True:
- cv2.namedWindow(self.__windows_name, cv2.WINDOW_NORMAL)
- while True:
- try:
- screenshot_frame = self.get_screenshot()
- except Exception as e:
- self.__pubtools.debug_output(e)
- screenshot_frame = screenshot_frame_last
- else:
- screenshot_frame_last = screenshot_frame
- self.__screenshot_frame = screenshot_frame
- self.__video_streamer.push(screenshot_frame)
- if (enable_saving_file is True) and (video_recode_service is not None):
- video_recode_service.write(screenshot_frame)
- if realtime_terminal_output is True:
- cv2.imshow(self.__windows_name, self.realtime_screenshot)
- cv2.waitKey(1)
- def get_screenshot(self) -> mat:
- self.scpi_instrument.connection.write(':DISP:DATA? JPG')
- raw_data = self.scpi_instrument.connection.read_raw()
- image_data = self.__parse_data(raw_data)
- img_raw = np.asarray(bytearray(image_data), dtype="uint8")
- img_raw = cv2.imdecode(img_raw, cv2.IMREAD_COLOR)
- return img_raw
- @staticmethod
- def __parse_data(raw_data: bytes) -> bytes:
- start_index = raw_data.find(b'#')
- if start_index == -1:
- raise ValueError("Invalid data: TMC header not found.")
- length_digits = int(raw_data[start_index + 1:start_index + 2])
- data_length = int(raw_data[start_index + 2:start_index + 2 + length_digits])
- data_start = start_index + 2 + length_digits
- data_end = data_start + data_length
- image_data = raw_data[data_start:data_end]
- return image_data
- class WaveformType(Enum):
- Sine = "SIN"
- Square = "SQU"
- Ramp = "RAMP"
- Pulse = "PULS"
- Noise = "NOIS"
- class WaveformTypeTools(object):
- @staticmethod
- def get_all_waveform_types() -> str:
- return " ".join([waveform_type.name for waveform_type in WaveformType])
- @staticmethod
- def match_waveform_type(waveform_name: str) -> WaveformType:
- try:
- return WaveformType[waveform_name.strip().capitalize()]
- except KeyError:
- return WaveformType.Sine
- class WaveformFreqUnit(Enum):
- MHz = "MHz"
- kHz = "kHz"
- Hz = "Hz"
- mHz = "mHz"
- uHz = "uHz"
- class WaveformLevelUnit(Enum):
- V = "Vpp"
- mV = "mVpp"
- class WaveformConfig(object):
- def __init__(self):
- self.type: WaveformType = WaveformType.Sine
- self.freq: float = 100
- self.freq_unit: WaveformFreqUnit = WaveformFreqUnit.kHz
- self.high_level: float = 3
- self.high_level_unit: WaveformLevelUnit = WaveformLevelUnit.V
- self.low_level: float = 3
- self.low_level_unit: WaveformLevelUnit = WaveformLevelUnit.mV
- @staticmethod
- def get_type_string() -> str:
- return WaveformTypeTools.get_all_waveform_types()
- @staticmethod
- def match_type_string(type_string: str) -> WaveformType:
- return WaveformTypeTools.match_waveform_type(type_string)
- def get_freq_unit_type_string(self) -> str:
- return self.__get_type_value_string(WaveformFreqUnit)
- def match_freq_unit_type_string(self, freq_unit_string: str) -> WaveformFreqUnit:
- return self.__match_type_value_string(WaveformFreqUnit, WaveformFreqUnit.kHz, freq_unit_string)
- def get_level_unit_type_string(self) -> str:
- return self.__get_type_value_string(WaveformLevelUnit)
- def match_level_unit_type_string(self, level_unit_string: str) -> WaveformLevelUnit:
- return self.__match_type_value_string(WaveformLevelUnit, WaveformLevelUnit.V, level_unit_string)
- @staticmethod
- def __get_type_value_string(type_enum) -> str:
- type_value_string: str = ""
- for type_enum_item in type_enum:
- type_value_string = type_value_string + " " + type_enum_item.value()
- return type_value_string
- @staticmethod
- def __match_type_value_string(type_enum, type_default, type_value_string: str):
- for item in type_enum:
- if item.value == type_value_string:
- return item
- return type_default
- def to_str(self) -> str:
- config = copy.deepcopy(self)
- parts = [
- config.type.value.lower(),
- str(config.freq),
- config.freq_unit.value,
- str(config.high_level),
- config.high_level_unit.value.lower(),
- str(config.low_level),
- config.low_level_unit.value.lower()
- ]
- if parts[2] == "MHz":
- parts[2] = "LMHz"
- return "|".join(parts)
- def read_str(self, config_str: str):
- parts = config_str.split("|")
- self.type = WaveformType(parts[0].capitalize())
- self.freq = float(parts[1])
- if parts[2].lower() == "lmhz":
- self.freq_unit = WaveformFreqUnit.MHz
- else:
- self.freq_unit = WaveformFreqUnit(parts[2].capitalize())
- self.high_level = float(parts[3])
- self.high_level_unit = WaveformLevelUnit(parts[4].upper())
- self.low_level = float(parts[5])
- self.low_level_unit = WaveformLevelUnit(parts[6].upper())
- class WaveformGeneratorService(object):
- def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
- config: InstrumentControllerConfig):
- self.scpi_instrument: ScpiInstrument = scpi_instrument
- self.__pubtools: ProgramPublicTools = pubtools
- self.__config: InstrumentControllerConfig = config
- self.enable_channel_1: bool = True
- self.enable_channel_2: bool = True
- self.config_channel_1: WaveformConfig = WaveformConfig()
- self.config_channel_2: WaveformConfig = WaveformConfig()
- self.name: str = "Waveform Generator"
- def output_restart(self):
- self.output_pause()
- self.output_start()
- def output_start(self):
- if self.enable_channel_1:
- self.__output_start_channel(1)
- if self.enable_channel_2:
- self.__output_start_channel(2)
- def __output_start_channel(self, channel_number: int):
- cmd = f":OUTPut{channel_number} ON"
- self.scpi_instrument.connection.write(cmd)
- def output_pause(self):
- self.__output_pause_channel(1)
- self.__output_pause_channel(2)
- def __output_pause_channel(self, channel_number: int):
- cmd = f":OUTPut{channel_number} OFF"
- self.scpi_instrument.connection.write(cmd)
- def get_config(self, channel: str) -> tuple[int, WaveformConfig]:
- if channel == "2":
- return 2, self.config_channel_2
- else:
- return 1, self.config_channel_1
- def set_enable(self, channel: str, enable: bool) -> int:
- if channel == "2":
- self.enable_channel_2 = enable
- return 2
- else:
- self.enable_channel_1 = enable
- return 1
- def apply_config(self, immediate_start: bool = False):
- self.__apply_config_channel(1, self.enable_channel_1, self.config_channel_1, immediate_start)
- self.__apply_config_channel(2, self.enable_channel_2, self.config_channel_2, immediate_start)
- def __apply_config_channel(self, channel_number: int, channel_enable: bool, channel_config: WaveformConfig,
- immediate_start: bool = False):
- if channel_config.type != WaveformType.Noise:
- cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type.value} " +
- f"{channel_config.freq}{channel_config.freq_unit.value}," +
- f"{channel_config.high_level}{channel_config.high_level_unit.value}")
- else:
- cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type.value} " +
- f"{channel_config.high_level}{channel_config.high_level_unit.value}")
- self.scpi_instrument.connection.write(cmd)
- if (immediate_start is False) or (channel_enable is False):
- self.output_pause()
- else:
- self.output_start()
- class AnalogElectronicLoadService(object):
- def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools,
- config: InstrumentControllerConfig):
- self.scpi_instrument: ScpiInstrument = scpi_instrument
- self.__pubtools: ProgramPublicTools = pubtools
- self.__config: InstrumentControllerConfig = config
- self.name: str = "Analog Electronic Load"
- self.__mode_set_time: int = 0
- def set_mode(self):
- if self.__mode_set_time <= 2:
- self.__mode_set_time += 1
- cmd_set = f":SOUR:FUNC RES"
- self.scpi_instrument.connection.write(cmd_set)
- def set_resistance(self, value: str):
- self.set_mode()
- cmd_set = f":SOUR:RES:LEV:IMM {value}"
- self.scpi_instrument.connection.write(cmd_set)
- def run_as_main():
- program_pubtools: ProgramPublicTools = ProgramPublicTools()
- instrument_controller: InstrumentController = InstrumentController(program_pubtools)
- instrument_controller.auto_connect()
- # _, _, server_url_multimeter = instrument_controller.services.digital_multimeter.keep_listening()
- # print("digital_multimeter", server_url_multimeter)
- # _, _, server_url_oscilloscope = instrument_controller.services.digital_oscilloscope.keep_listening()
- # print("digital_oscilloscope", server_url_oscilloscope)
- while True:
- input_value = input("input: ")
- instrument_controller.services.analog_electronic_load.set_resistance(input_value)
- if __name__ == "__main__":
- run_as_main()
|