import copy import os import random import threading import time import typing from collections import deque from enum import Enum from typing import Optional import cv2 import numpy as np import pyvisa from numpy import ndarray as mat from pyvisa import ResourceManager as scpiManager from pyvisa.resources import MessageBasedResource as scpiConnection from flask import Flask, Response from flask import Flask, jsonify from flask_cors import CORS from program_public_tools import ProgramPublicTools class ScpiInstrument(object): connection: scpiConnection name: str response: str class InstrumentResponseMark(object): def __init__(self): self.digital_multimeter: str = "DM3068" self.digital_oscilloscope: str = "DHO1204" self.waveform_generator: str = "DG5072" self.analog_electronic_load: str = "DL3021" self.__check_env_value() def __check_env_value(self): config_digital_multimeter = self.__get_env("PLC_SIM_SERVER_DIGITAL_MULTIMETER") config_digital_oscilloscope = self.__get_env("PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE") config_waveform_generator = self.__get_env("PLC_SIM_SERVER_WAVEFORM_GENERATOR") config_analog_electronic_load = self.__get_env("PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD") print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_MULTIMETER = {config_digital_multimeter}") print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_DIGITAL_OSCILLOSCOPE = {config_digital_oscilloscope}") print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_WAVEFORM_GENERATOR = {config_waveform_generator}") print(f"[Instrument Controller] Using config: PLC_SIM_SERVER_ANALOG_ELECTRONIC_LOAD = {config_analog_electronic_load}") if config_digital_multimeter is not None: self.digital_multimeter = config_digital_multimeter if config_digital_oscilloscope is not None: self.digital_oscilloscope = config_digital_oscilloscope if config_waveform_generator is not None: self.waveform_generator = config_waveform_generator if config_analog_electronic_load is not None: self.analog_electronic_load = config_analog_electronic_load @staticmethod def __get_env(name: str) -> Optional[str]: return os.environ.get(name, None) class InstrumentControllerConfig(object): connection_timeout_ms: int = 5000 response_mark: InstrumentResponseMark = InstrumentResponseMark() buffer_path: str = "./temp/" class InstrumentServices(object): def __init__(self): self.digital_multimeter: Optional[DigitalMultimeterService] = None self.digital_oscilloscope: Optional[DigitalOscilloscopeService] = None self.waveform_generator: Optional[WaveformGeneratorService] = None self.analog_electronic_load: Optional[AnalogElectronicLoadService] = None def check(self) -> list[str]: not_connect_list: list[str] = [] for attr_name, attr_value in vars(self).items(): if attr_value is None: not_connect_list.append(attr_name) return not_connect_list class InstrumentController(object): def __init__(self, pubtools: ProgramPublicTools): self.__scpi_manager: scpiManager = self.__init_scpi_manager() self.config: InstrumentControllerConfig = InstrumentControllerConfig() self.__pubtools: ProgramPublicTools = pubtools self.__scpi_instrument_list: list[ScpiInstrument] = [] self.services: InstrumentServices = InstrumentServices() self.__retry_times: int = 1 @staticmethod def __init_scpi_manager() -> scpiManager: return pyvisa.ResourceManager() def auto_connect(self) -> tuple[list[ScpiInstrument], list[str]]: self.__auto_disconnect() self.__scpi_instrument_list = self.__scan_and_connect_scpi_all() for instrument in self.__scpi_instrument_list: if self.config.response_mark.digital_multimeter in instrument.response: self.services.digital_multimeter = DigitalMultimeterService(instrument, self.__pubtools, self.config) elif self.config.response_mark.digital_oscilloscope in instrument.response: self.services.digital_oscilloscope = DigitalOscilloscopeService(instrument, self.__pubtools, self.config) elif self.config.response_mark.waveform_generator in instrument.response: self.services.waveform_generator = WaveformGeneratorService(instrument, self.__pubtools, self.config) elif self.config.response_mark.analog_electronic_load in instrument.response: self.services.analog_electronic_load = AnalogElectronicLoadService(instrument, self.__pubtools, self.config) else: pass not_connect_list = self.services.check() if len(not_connect_list) == 0: self.__pubtools.debug_output("All SCPI instruments connect successfully.") else: for instrument_name in not_connect_list: self.__pubtools.debug_output(f'Error: Cannot connect to instrument "{instrument_name}".') return self.__scpi_instrument_list, not_connect_list def __scan_and_connect_scpi_all(self) -> list[ScpiInstrument]: self.__pubtools.debug_output("Start to connect instruments...") device_name_list: tuple[str] = self.__scpi_manager.list_resources() scpi_instrument_list: list[ScpiInstrument] = [] connect_number: int = 0 if len(device_name_list) != 0: for device_name in device_name_list: scpi_connection, scpi_response = self.__scpi_connect(device_name) if scpi_connection is not None: scpi_instrument: ScpiInstrument = ScpiInstrument() scpi_instrument.connection = scpi_connection scpi_instrument.name = device_name scpi_instrument.response = scpi_response scpi_instrument_list.append(scpi_instrument) connect_number += 1 self.__pubtools.debug_output(f"Connect completed! Connected to [{connect_number}] instruments.") return scpi_instrument_list def __auto_disconnect(self) -> list[ScpiInstrument]: new_scpi_instrument_list: list[ScpiInstrument] = [] if len(self.__scpi_instrument_list) != 0: for scpi_instrument in self.__scpi_instrument_list: scpi_instrument.connection.close() self.__scpi_instrument_list = new_scpi_instrument_list return new_scpi_instrument_list def __scpi_connect(self, device_name: str) -> tuple[Optional[scpiConnection], Optional[str]]: for _ in range(0, self.__retry_times): try: scpi_connection = typing.cast(scpiConnection, self.__scpi_manager.open_resource(device_name)) scpi_connection.timeout = self.config.connection_timeout_ms scpi_response: str = scpi_connection.query('*IDN?') scpi_response = scpi_response.strip() self.__pubtools.debug_output(f'Connect to SCPI device "{device_name}".') self.__pubtools.debug_output(f'Device response: "{scpi_response}"') except Exception as e: self.__pubtools.debug_output(f'Error when attempting to connect scpi device "{device_name}".') self.__pubtools.debug_output(f'Error information: {e}') self.__pubtools.debug_output(f'Start to retry......') else: return scpi_connection, scpi_response self.__pubtools.debug_output(f'Retrying failed for 3 times. Give up connecting.') return None, None class FloatServer: def __init__(self, name: Optional[str] = None): self.name: Optional[str] = name self.__app = Flask(__name__) CORS(self.__app) self.__value = 0.0 self.__setup_routes() def __setup_routes(self): if self.name is None: self.name = f"/float_default_{time.time()}" else: self.name = f"/{self.name}" @self.__app.route(self.name, methods=['GET']) def get_value(): return jsonify({'value': self.__value}) def run(self, host, port): self.__app.run(host=host, port=port) def push(self, new_value): self.__value = new_value class MultimeterValueRange(Enum): range_200mv = "0" range_2v = "1" range_20v = "2" range_200v = "3" range_1000v = "4" default = "3" class MultimeterValueRangeTools(object): @staticmethod def get_range_type() -> list[str]: type_list: list[str] = ["200mv", "2v", "20v", "200v", "1000v", "default"] return type_list @staticmethod def solve_range(rang_string) -> MultimeterValueRange: match rang_string: case "200mv": return MultimeterValueRange.range_200mv case "2v": return MultimeterValueRange.range_2v case "20v": return MultimeterValueRange.range_20v case "200v": return MultimeterValueRange.range_200v case "1000v": return MultimeterValueRange.range_1000v case "default": return MultimeterValueRange.default case _: return MultimeterValueRange.default class DigitalMultimeterService(object): def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools, config: InstrumentControllerConfig): self.scpi_instrument: ScpiInstrument = scpi_instrument self.__pubtools: ProgramPublicTools = pubtools self.__config: InstrumentControllerConfig = config self.__range: MultimeterValueRange = MultimeterValueRange.default self.realtime_value: float = float(0) self.__listening_thread: Optional[threading.Thread] = None self.__float_server_thread: Optional[threading.Thread] = None self.__whether_range_need_to_set: bool = True self.__error_count: int = 0 self.name: str = "Digital Multimeter" self.__instrument_name: str = "multimeter" self.__float_server: FloatServer = FloatServer(self.__instrument_name) self.__is_listening: bool = False self.__server_url: str = '' def set_range(self, value_range: MultimeterValueRange): self.__range = value_range @staticmethod def get_range() -> list[str]: return MultimeterValueRangeTools.get_range_type() @staticmethod def solve_range_string(range_string: str) -> MultimeterValueRange: return MultimeterValueRangeTools.solve_range(range_string) def __float_server_main(self, float_server: FloatServer, server_host, server_port): self.__float_server.run(host=server_host, port=server_port) def keep_listening(self, realtime_terminal_output: bool = False, server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"): if self.__is_listening is False: self.__start_server_listening(realtime_terminal_output, server_port, server_host) self.__is_listening = True return self.__return_server_info() def __start_server_listening(self, realtime_terminal_output: bool = False, server_port: Optional[int] = None, server_host: Optional[str] = "0.0.0.0"): if server_port is None: server_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=8600)[0] listening_thread_args = (realtime_terminal_output,) self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start() float_server_args = (self.__float_server, server_host, server_port) self.__float_server_thread = threading.Thread(target=self.__float_server_main, args=float_server_args).start() server_root_name = self.__float_server.name self.__server_url = f"http://localhost:{server_port}{server_root_name}" return self.__return_server_info() def __return_server_info(self): return self.__listening_thread, self.__float_server_thread, self.__server_url def __listening_main(self, realtime_terminal_output: bool): threading.Thread(target=self.__listening_auto_clear) while True: self.realtime_value = self.__get_value() self.__float_server.push(self.realtime_value) if realtime_terminal_output is True: self.__pubtools.debug_output(f"Listening voltage from digital multimeter: {self.realtime_value}.") def __listening_auto_clear(self): while True: self.__whether_range_need_to_set = True time.sleep(5) def get_value_latest(self) -> float: return self.realtime_value def __get_value(self, use_range: Optional[MultimeterValueRange] = None): if use_range is None: use_range = self.__range else: self.__whether_range_need_to_set = True if (self.__whether_range_need_to_set is True) and (self.__error_count <= 3): try: cmd_set = f":MEASure:VOLTage:DC {use_range.value}" self.scpi_instrument.connection.query(cmd_set) except Exception as e: self.__pubtools.debug_output("Exception found when trying to set range of digital multimeter.") self.__pubtools.debug_output(f"Error info: {e}") self.__error_count += 1 else: self.__whether_range_need_to_set = False self.__error_count = 0 cmd_query = ":MEASure:VOLTage:DC?" multimeter_value = self.scpi_instrument.connection.query(cmd_query) return float(multimeter_value) class VideoStreamer: def __init__(self, buffer_size: int = 10, name: Optional[str] = None): self.name: Optional[str] = name self.__app = Flask(__name__) self.__frame_queue = deque(maxlen=buffer_size) self.__lock = threading.Lock() # 用于确保线程安全 self.__setup_routes() def push(self, frame): """ 推送一个帧到流。 """ with self.__lock: self.__frame_queue.append(frame) def __generate(self): while True: if len(self.__frame_queue) == 0: time.sleep(0.05) # 若队列为空,稍微等待 continue with self.__lock: frame = self.__frame_queue.popleft() # 从队列左侧获取帧 ret, jpeg = cv2.imencode('.jpg', frame) if ret: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n') def __video_feed(self): return Response(self.__generate(), mimetype='multipart/x-mixed-replace; boundary=frame') def __setup_routes(self): if self.name is None: self.name = f"/video_default_{time.time()}" else: self.name = f"/{self.name}" self.__app.add_url_rule(self.name, 'video_feed', self.__video_feed) def run(self, host, port): self.__app.run(threaded=True, host=host, port=port) return self.name class DigitalOscilloscopeService(object): def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools, config: InstrumentControllerConfig): self.scpi_instrument: ScpiInstrument = scpi_instrument self.__pubtools: ProgramPublicTools = pubtools self.__config: InstrumentControllerConfig = config self.__instrument_name: str = "oscilloscope" self.realtime_screenshot: Optional[mat] = None self.__listening_thread: Optional[threading.Thread] = None self.__streamer_thread: Optional[threading.Thread] = None self.__windows_name: str = "Realtime Screenshot From Digital Oscilloscope" self.__video_streamer: VideoStreamer = VideoStreamer(name="oscilloscope") self.name: str = "Digital Oscilloscope" self.__is_listening: bool = False self.__stream_port: Optional[int] = None self.__stream_url: Optional[str] = None self.__screenshot_frame = None self.__img_empty = np.zeros((500, 500, 3), dtype=np.uint8) self.__img_empty[:, :] = [255, 255, 255] # BGR def keep_listening(self, realtime_terminal_output: bool = False, stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"): if self.__is_listening is False: self.__start_listening_server(realtime_terminal_output, stream_port, stream_host) self.__is_listening = True return self.__return_stream_info() def __return_stream_info(self): return self.__listening_thread, self.__stream_port, self.__stream_url def __start_listening_server(self, realtime_terminal_output: bool = False, stream_port: Optional[int] = None, stream_host: str = "0.0.0.0"): if stream_port is None: stream_port = self.__pubtools.find_available_server_port(number=1, address="0.0.0.0", start_port=9000)[0] self.__stream_port = stream_port listening_thread_args = (realtime_terminal_output, True) self.__listening_thread = threading.Thread(target=self.__listening_main, args=listening_thread_args).start() streamer_thread_args = (self.__video_streamer, stream_port, stream_host) self.__streamer_thread = threading.Thread(target=self.__streamer_main, args=streamer_thread_args).start() stream_root_name = self.__video_streamer.name self.__stream_url = f"http://localhost:{stream_port}{stream_root_name}" return self.__return_stream_info() @staticmethod def __streamer_main(video_streamer: VideoStreamer, stream_port: int = None, stream_host: str = "0.0.0.0"): video_streamer.run(host=stream_host, port=stream_port) def save_img(self): get_time: str = time.strftime("%H:%M:%S") current_file_path = os.path.realpath(__file__) current_dir = os.path.dirname(current_file_path) parent_dir = os.path.dirname(current_dir) file_path = os.path.join(parent_dir, "export", f"OSC.{get_time.replace(':', '.')}.png") if self.__screenshot_frame is None: cv2.imwrite(file_path, self.__img_empty) else: cv2.imwrite(file_path, self.__screenshot_frame) return get_time def __listening_main(self, realtime_terminal_output: bool, enable_saving_file: bool = True): video_recode_service = None height, width = 512, 512 screenshot_frame_last = np.zeros((height, width, 3), dtype=np.uint8) screenshot_frame_last[:, :] = [255, 255, 255] # BGR if enable_saving_file is True: folder_path = os.path.join(self.__config.buffer_path, self.__instrument_name) os.makedirs(folder_path, exist_ok=True) img_path = os.path.join(folder_path, f"{self.__instrument_name}.avi") fourcc = cv2.VideoWriter_fourcc('I', '4', '2', '0') video_recode_service = cv2.VideoWriter(img_path, fourcc, 25, (640, 480)) if realtime_terminal_output is True: cv2.namedWindow(self.__windows_name, cv2.WINDOW_NORMAL) while True: try: screenshot_frame = self.get_screenshot() except Exception as e: self.__pubtools.debug_output(e) screenshot_frame = screenshot_frame_last else: screenshot_frame_last = screenshot_frame self.__screenshot_frame = screenshot_frame self.__video_streamer.push(screenshot_frame) if (enable_saving_file is True) and (video_recode_service is not None): video_recode_service.write(screenshot_frame) if realtime_terminal_output is True: cv2.imshow(self.__windows_name, self.realtime_screenshot) cv2.waitKey(1) def get_screenshot(self) -> mat: self.scpi_instrument.connection.write(':DISP:DATA? JPG') raw_data = self.scpi_instrument.connection.read_raw() image_data = self.__parse_data(raw_data) img_raw = np.asarray(bytearray(image_data), dtype="uint8") img_raw = cv2.imdecode(img_raw, cv2.IMREAD_COLOR) return img_raw @staticmethod def __parse_data(raw_data: bytes) -> bytes: start_index = raw_data.find(b'#') if start_index == -1: raise ValueError("Invalid data: TMC header not found.") length_digits = int(raw_data[start_index + 1:start_index + 2]) data_length = int(raw_data[start_index + 2:start_index + 2 + length_digits]) data_start = start_index + 2 + length_digits data_end = data_start + data_length image_data = raw_data[data_start:data_end] return image_data class WaveformType(Enum): Sine = "SIN" Square = "SQU" Ramp = "RAMP" Pulse = "PULS" Noise = "NOIS" class WaveformTypeTools(object): @staticmethod def get_all_waveform_types() -> str: return " ".join([waveform_type.name for waveform_type in WaveformType]) @staticmethod def match_waveform_type(waveform_name: str) -> WaveformType: try: return WaveformType[waveform_name.strip().capitalize()] except KeyError: return WaveformType.Sine class WaveformFreqUnit(Enum): MHz = "MHz" kHz = "kHz" Hz = "Hz" mHz = "mHz" uHz = "uHz" class WaveformLevelUnit(Enum): V = "Vpp" mV = "mVpp" class WaveformConfig(object): def __init__(self): self.type: WaveformType = WaveformType.Sine self.type_string: str = "SIN" self.freq: float = 100 self.freq_unit: WaveformFreqUnit = WaveformFreqUnit.kHz self.high_level: float = 3 self.high_level_unit: WaveformLevelUnit = WaveformLevelUnit.V self.low_level: float = 3 self.low_level_unit: WaveformLevelUnit = WaveformLevelUnit.mV @staticmethod def get_type_string() -> str: return WaveformTypeTools.get_all_waveform_types() @staticmethod def match_type_string(type_string: str) -> WaveformType: return WaveformTypeTools.match_waveform_type(type_string) def get_freq_unit_type_string(self) -> str: return self.__get_type_value_string(WaveformFreqUnit) def match_freq_unit_type_string(self, freq_unit_string: str) -> WaveformFreqUnit: return self.__match_type_value_string(WaveformFreqUnit, WaveformFreqUnit.kHz, freq_unit_string) def get_level_unit_type_string(self) -> str: return self.__get_type_value_string(WaveformLevelUnit) def match_level_unit_type_string(self, level_unit_string: str) -> WaveformLevelUnit: return self.__match_type_value_string(WaveformLevelUnit, WaveformLevelUnit.V, level_unit_string) @staticmethod def __get_type_value_string(type_enum) -> str: type_value_string: str = "" for type_enum_item in type_enum: type_value_string = type_value_string + " " + type_enum_item.value() return type_value_string @staticmethod def __match_type_value_string(type_enum, type_default, type_value_string: str): for item in type_enum: if item.value == type_value_string: return item return type_default def to_str(self) -> str: config = copy.deepcopy(self) parts = [ config.type.value.lower(), str(config.freq), config.freq_unit.value, str(config.high_level), config.high_level_unit.value.lower(), str(config.low_level), config.low_level_unit.value.lower() ] if parts[2] == "MHz": parts[2] = "LMHz" return "|".join(parts) def read_str(self, config_str: str): parts = config_str.split("|") self.type = WaveformType(parts[0].capitalize()) self.freq = float(parts[1]) if parts[2].lower() == "lmhz": self.freq_unit = WaveformFreqUnit.MHz else: self.freq_unit = WaveformFreqUnit(parts[2].capitalize()) self.high_level = float(parts[3]) self.high_level_unit = WaveformLevelUnit(parts[4].upper()) self.low_level = float(parts[5]) self.low_level_unit = WaveformLevelUnit(parts[6].upper()) class WaveformGeneratorService(object): def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools, config: InstrumentControllerConfig): self.scpi_instrument: ScpiInstrument = scpi_instrument self.__pubtools: ProgramPublicTools = pubtools self.__config: InstrumentControllerConfig = config self.enable_channel_1: bool = True self.enable_channel_2: bool = True self.config_channel_1: WaveformConfig = WaveformConfig() self.config_channel_2: WaveformConfig = WaveformConfig() self.name: str = "Waveform Generator" def output_restart(self): self.output_pause() self.output_start() def output_start(self): if self.enable_channel_1: self.__output_start_channel(1) if self.enable_channel_2: self.__output_start_channel(2) def __output_start_channel(self, channel_number: int): cmd = f":OUTPut{channel_number} ON" self.scpi_instrument.connection.write(cmd) def output_pause(self): self.__output_pause_channel(1) self.__output_pause_channel(2) def __output_pause_channel(self, channel_number: int): cmd = f":OUTPut{channel_number} OFF" self.scpi_instrument.connection.write(cmd) def get_config(self, channel: str) -> tuple[int, WaveformConfig]: if channel == "2": return 2, self.config_channel_2 else: return 1, self.config_channel_1 def set_enable(self, channel: str, enable: bool) -> int: if channel == "2": self.enable_channel_2 = enable return 2 else: self.enable_channel_1 = enable return 1 def apply_config(self, immediate_start: bool = False): self.__apply_config_channel(1, self.enable_channel_1, self.config_channel_1, immediate_start) self.__apply_config_channel(2, self.enable_channel_2, self.config_channel_2, immediate_start) def __apply_config_channel(self, channel_number: int, channel_enable: bool, channel_config: WaveformConfig, immediate_start: bool = False): if channel_config.type_string != WaveformType.Noise.value: cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type_string} " + f"{channel_config.freq}{channel_config.freq_unit.value}," + f"{channel_config.high_level}{channel_config.high_level_unit.value}") else: cmd = (f":SOURce{channel_number}:APPLy:{channel_config.type_string} " + f"{channel_config.high_level}{channel_config.high_level_unit.value}") self.scpi_instrument.connection.write(cmd) if (immediate_start is False) or (channel_enable is False): self.output_pause() else: self.output_start() class AnalogElectronicLoadService(object): def __init__(self, scpi_instrument: ScpiInstrument, pubtools: ProgramPublicTools, config: InstrumentControllerConfig): self.scpi_instrument: ScpiInstrument = scpi_instrument self.__pubtools: ProgramPublicTools = pubtools self.__config: InstrumentControllerConfig = config self.name: str = "Analog Electronic Load" self.__mode_set_time: int = 0 def set_mode(self): if self.__mode_set_time <= 2: self.__mode_set_time += 1 cmd_set = f":SOUR:FUNC RES" self.scpi_instrument.connection.write(cmd_set) def set_resistance(self, value: str): self.set_mode() cmd_set = f":SOUR:RES:LEV:IMM {value}" self.scpi_instrument.connection.write(cmd_set) def run_as_main(): program_pubtools: ProgramPublicTools = ProgramPublicTools() instrument_controller: InstrumentController = InstrumentController(program_pubtools) instrument_controller.auto_connect() # _, _, server_url_multimeter = instrument_controller.services.digital_multimeter.keep_listening() # print("digital_multimeter", server_url_multimeter) # _, _, server_url_oscilloscope = instrument_controller.services.digital_oscilloscope.keep_listening() # print("digital_oscilloscope", server_url_oscilloscope) while True: input_value = input("input: ") instrument_controller.services.analog_electronic_load.set_resistance(input_value) if __name__ == "__main__": run_as_main()