Hopi/hopi/serial_client.py

116 lines
3.3 KiB
Python

from dataclasses import dataclass
from typing import Optional
import serial
@dataclass(frozen=True)
class SerialConfig:
port: str = "/dev/ttyUSB0"
baudrate: int = 9600
timeout: float = 0.1
bytesize: int = serial.EIGHTBITS
parity: str = serial.PARITY_NONE
stopbits: int = serial.STOPBITS_ONE
class ModbusRtuSerialClient:
"""Minimal Modbus RTU client over a serial port.
Only implements function 0x03 (Read Holding Registers).
"""
def __init__(self, config: SerialConfig):
self._config = config
self._ser: Optional[serial.Serial] = None
def __enter__(self) -> "ModbusRtuSerialClient":
self.open()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
@property
def is_open(self) -> bool:
return self._ser is not None and self._ser.is_open
def open(self) -> None:
if self.is_open:
return
self._ser = serial.Serial(
port=self._config.port,
baudrate=self._config.baudrate,
bytesize=self._config.bytesize,
parity=self._config.parity,
stopbits=self._config.stopbits,
timeout=self._config.timeout,
)
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
def close(self) -> None:
if self._ser is not None:
try:
self._ser.close()
finally:
self._ser = None
def read_holding_registers(
self,
slave_id: int,
start_register: int,
register_count: int,
*,
read_max_bytes: int = 64,
exact_response_length: bool = True,
) -> bytes:
if not self.is_open:
raise RuntimeError("Serial port is not open")
if not (0 <= slave_id <= 247):
raise ValueError("slave_id must be in range 0..247")
if not (0 <= start_register <= 0xFFFF):
raise ValueError("start_register must be in range 0..65535")
if not (1 <= register_count <= 125):
raise ValueError("register_count must be in range 1..125")
request_wo_crc = bytes(
[
slave_id,
0x03,
(start_register >> 8) & 0xFF,
start_register & 0xFF,
(register_count >> 8) & 0xFF,
register_count & 0xFF,
]
)
request = request_wo_crc + self._crc16_modbus_le(request_wo_crc)
ser = self._ser
assert ser is not None
ser.reset_input_buffer()
ser.reset_output_buffer()
ser.write(request)
ser.flush()
if exact_response_length:
# Expected RTU response length for 0x03:
# slave(1) + func(1) + byte_count(1) + data(2*N) + crc(2)
expected_len = 5 + (2 * register_count)
return ser.read(expected_len)
return ser.read(read_max_bytes)
@staticmethod
def _crc16_modbus_le(payload: bytes) -> bytes:
"""Compute Modbus CRC16 and return as little-endian 2 bytes."""
crc = 0xFFFF
for byte in payload:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])