Hopi/Hopi.py

82 lines
3.0 KiB
Python

import argparse
import time
from datetime import datetime, timezone
from hopi.data_handler import PowerMeterDataHandler
from hopi.serial_client import ModbusRtuSerialClient, SerialConfig
SLAVE_ID = 1
START_REG = 0
NUM_WORDS = 20
READ_MAX_BYTES = 5 + (2 * NUM_WORDS)
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Read and print power meter values over Modbus RTU.")
parser.add_argument("--port", default="/dev/ttyUSB0")
parser.add_argument("--baud", type=int, default=9600)
parser.add_argument("--timeout", type=float, default=0.1)
parser.add_argument("--daemon", action="store_true", help="Run continuously until stopped (Ctrl+C)")
parser.add_argument("--csv", default="hopi_readings.csv", help="CSV output path for daemon mode")
parser.add_argument("--interval", type=float, default=1.0, help="Seconds between reads in daemon mode")
return parser
def main() -> None:
args = build_arg_parser().parse_args()
config = SerialConfig(port=args.port, baudrate=args.baud, timeout=args.timeout)
client = ModbusRtuSerialClient(config)
handler = PowerMeterDataHandler()
if args.daemon:
if args.timeout <= 0:
raise ValueError("--timeout must be > 0")
if args.interval <= 0:
raise ValueError("--interval must be > 0")
if args.interval <= args.timeout:
raise ValueError("--interval must be > --timeout")
next_deadline = time.monotonic()
with client, handler.open_csv_logger(args.csv) as logger:
try:
while True:
response = client.read_holding_registers(
slave_id=SLAVE_ID,
start_register=START_REG,
register_count=NUM_WORDS,
read_max_bytes=READ_MAX_BYTES,
exact_response_length=True,
)
registers = handler.parse_read_holding_registers_response(response)
readings = handler.decode_readings(registers)
row = {"timestamp": datetime.now(timezone.utc).isoformat()}
row.update(handler.readings_to_flat_dict(readings))
logger.log(row)
next_deadline += args.interval
sleep_for = next_deadline - time.monotonic()
if sleep_for > 0:
time.sleep(sleep_for)
except KeyboardInterrupt:
return
with client:
response = client.read_holding_registers(
slave_id=SLAVE_ID,
start_register=START_REG,
register_count=NUM_WORDS,
read_max_bytes=READ_MAX_BYTES,
exact_response_length=True,
)
registers = handler.parse_read_holding_registers_response(response)
readings = handler.decode_readings(registers)
handler.print_readings_json(readings)
if __name__ == "__main__":
main()