116 lines
3.3 KiB
Python
116 lines
3.3 KiB
Python
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import serial
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SerialConfig:
|
|
port: str = "/dev/ttyUSB0"
|
|
baudrate: int = 9600
|
|
timeout: float = 0.1
|
|
bytesize: int = serial.EIGHTBITS
|
|
parity: str = serial.PARITY_NONE
|
|
stopbits: int = serial.STOPBITS_ONE
|
|
|
|
|
|
class ModbusRtuSerialClient:
|
|
"""Minimal Modbus RTU client over a serial port.
|
|
|
|
Only implements function 0x03 (Read Holding Registers).
|
|
"""
|
|
|
|
def __init__(self, config: SerialConfig):
|
|
self._config = config
|
|
self._ser: Optional[serial.Serial] = None
|
|
|
|
def __enter__(self) -> "ModbusRtuSerialClient":
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
self.close()
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._ser is not None and self._ser.is_open
|
|
|
|
def open(self) -> None:
|
|
if self.is_open:
|
|
return
|
|
self._ser = serial.Serial(
|
|
port=self._config.port,
|
|
baudrate=self._config.baudrate,
|
|
bytesize=self._config.bytesize,
|
|
parity=self._config.parity,
|
|
stopbits=self._config.stopbits,
|
|
timeout=self._config.timeout,
|
|
)
|
|
self._ser.reset_input_buffer()
|
|
self._ser.reset_output_buffer()
|
|
|
|
def close(self) -> None:
|
|
if self._ser is not None:
|
|
try:
|
|
self._ser.close()
|
|
finally:
|
|
self._ser = None
|
|
|
|
def read_holding_registers(
|
|
self,
|
|
slave_id: int,
|
|
start_register: int,
|
|
register_count: int,
|
|
*,
|
|
read_max_bytes: int = 64,
|
|
exact_response_length: bool = True,
|
|
) -> bytes:
|
|
if not self.is_open:
|
|
raise RuntimeError("Serial port is not open")
|
|
if not (0 <= slave_id <= 247):
|
|
raise ValueError("slave_id must be in range 0..247")
|
|
if not (0 <= start_register <= 0xFFFF):
|
|
raise ValueError("start_register must be in range 0..65535")
|
|
if not (1 <= register_count <= 125):
|
|
raise ValueError("register_count must be in range 1..125")
|
|
|
|
request_wo_crc = bytes(
|
|
[
|
|
slave_id,
|
|
0x03,
|
|
(start_register >> 8) & 0xFF,
|
|
start_register & 0xFF,
|
|
(register_count >> 8) & 0xFF,
|
|
register_count & 0xFF,
|
|
]
|
|
)
|
|
request = request_wo_crc + self._crc16_modbus_le(request_wo_crc)
|
|
|
|
ser = self._ser
|
|
assert ser is not None
|
|
ser.reset_input_buffer()
|
|
ser.reset_output_buffer()
|
|
ser.write(request)
|
|
ser.flush()
|
|
|
|
if exact_response_length:
|
|
# Expected RTU response length for 0x03:
|
|
# slave(1) + func(1) + byte_count(1) + data(2*N) + crc(2)
|
|
expected_len = 5 + (2 * register_count)
|
|
return ser.read(expected_len)
|
|
|
|
return ser.read(read_max_bytes)
|
|
|
|
@staticmethod
|
|
def _crc16_modbus_le(payload: bytes) -> bytes:
|
|
"""Compute Modbus CRC16 and return as little-endian 2 bytes."""
|
|
crc = 0xFFFF
|
|
for byte in payload:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x0001:
|
|
crc = (crc >> 1) ^ 0xA001
|
|
else:
|
|
crc >>= 1
|
|
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
|