from dataclasses import dataclass from typing import Optional import serial @dataclass(frozen=True) class SerialConfig: port: str = "/dev/ttyUSB0" baudrate: int = 9600 timeout: float = 0.1 bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: int = serial.STOPBITS_ONE class ModbusRtuSerialClient: """Minimal Modbus RTU client over a serial port. Only implements function 0x03 (Read Holding Registers). """ def __init__(self, config: SerialConfig): self._config = config self._ser: Optional[serial.Serial] = None def __enter__(self) -> "ModbusRtuSerialClient": self.open() return self def __exit__(self, exc_type, exc, tb) -> None: self.close() @property def is_open(self) -> bool: return self._ser is not None and self._ser.is_open def open(self) -> None: if self.is_open: return self._ser = serial.Serial( port=self._config.port, baudrate=self._config.baudrate, bytesize=self._config.bytesize, parity=self._config.parity, stopbits=self._config.stopbits, timeout=self._config.timeout, ) self._ser.reset_input_buffer() self._ser.reset_output_buffer() def close(self) -> None: if self._ser is not None: try: self._ser.close() finally: self._ser = None def read_holding_registers( self, slave_id: int, start_register: int, register_count: int, *, read_max_bytes: int = 64, exact_response_length: bool = True, ) -> bytes: if not self.is_open: raise RuntimeError("Serial port is not open") if not (0 <= slave_id <= 247): raise ValueError("slave_id must be in range 0..247") if not (0 <= start_register <= 0xFFFF): raise ValueError("start_register must be in range 0..65535") if not (1 <= register_count <= 125): raise ValueError("register_count must be in range 1..125") request_wo_crc = bytes( [ slave_id, 0x03, (start_register >> 8) & 0xFF, start_register & 0xFF, (register_count >> 8) & 0xFF, register_count & 0xFF, ] ) request = request_wo_crc + self._crc16_modbus_le(request_wo_crc) ser = self._ser assert ser is not None ser.reset_input_buffer() ser.reset_output_buffer() ser.write(request) ser.flush() if exact_response_length: # Expected RTU response length for 0x03: # slave(1) + func(1) + byte_count(1) + data(2*N) + crc(2) expected_len = 5 + (2 * register_count) return ser.read(expected_len) return ser.read(read_max_bytes) @staticmethod def _crc16_modbus_le(payload: bytes) -> bytes: """Compute Modbus CRC16 and return as little-endian 2 bytes.""" crc = 0xFFFF for byte in payload: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return bytes([crc & 0xFF, (crc >> 8) & 0xFF])