From 8d8e1cc5eb19f7290a1ca825e01af4b8afcf57c6 Mon Sep 17 00:00:00 2001 From: osiu97 Date: Mon, 5 Jan 2026 11:34:05 +0100 Subject: [PATCH] add crude mqtt read capabality --- Hopi.py | 106 +++++++++++++++++++++++++++------ hopi/data_handler.py | 79 ++++++++++++++++++++++++ hopi/mqtt_client.py | 139 +++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 307 insertions(+), 18 deletions(-) create mode 100644 hopi/mqtt_client.py diff --git a/Hopi.py b/Hopi.py index 3cc001e..37a56a9 100644 --- a/Hopi.py +++ b/Hopi.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from hopi.data_handler import PowerMeterDataHandler from hopi.gui_plotter import PlotConfig, RealtimePlotter +from hopi.mqtt_client import MqttConfig, MqttJsonSubscriber from hopi.serial_client import ModbusRtuSerialClient, SerialConfig @@ -15,9 +16,23 @@ READ_MAX_BYTES = 5 + (2 * NUM_WORDS) def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Read and print power meter values over Modbus RTU.") + parser.add_argument( + "--source", + choices=["usb", "mqtt"], + default="usb", + help="Data source: 'usb' (Modbus RTU over serial) or 'mqtt' (JSON over MQTT)", + ) + parser.add_argument("--port", default="/dev/ttyUSB0") parser.add_argument("--baud", type=int, default=9600) parser.add_argument("--timeout", type=float, default=0.1) + + parser.add_argument("--mqtt-server", default="localhost:1883", help="MQTT broker host or host:port") + parser.add_argument("--mqtt-topic", default="pico-usb-host-modbus/modbus/readings", help="MQTT topic to subscribe to") + parser.add_argument("--mqtt-username", default=None) + parser.add_argument("--mqtt-password", default=None) + parser.add_argument("--mqtt-client-id", default=None) + parser.add_argument("--daemon", action="store_true", help="Run continuously until stopped (Ctrl+C)") parser.add_argument("--csv", default="hopi_readings.csv", help="CSV output path for daemon mode") parser.add_argument("--interval", type=float, default=1.0, help="Seconds between reads in daemon mode") @@ -29,16 +44,20 @@ def build_arg_parser() -> argparse.ArgumentParser: def main() -> None: args = build_arg_parser().parse_args() - config = SerialConfig(port=args.port, baudrate=args.baud, timeout=args.timeout) - client = ModbusRtuSerialClient(config) handler = PowerMeterDataHandler() + if args.source == "usb": + config = SerialConfig(port=args.port, baudrate=args.baud, timeout=args.timeout) + client = ModbusRtuSerialClient(config) + else: + client = None + if args.daemon: if args.timeout <= 0: raise ValueError("--timeout must be > 0") if args.interval <= 0: raise ValueError("--interval must be > 0") - if args.interval <= args.timeout: + if args.source == "usb" and args.interval <= args.timeout: raise ValueError("--interval must be > --timeout") t0 = time.monotonic() @@ -50,22 +69,61 @@ def main() -> None: plotter = RealtimePlotter(series_names, config=PlotConfig(max_points=max(10, args.max_points))) plotter.open() - with client, handler.open_csv_logger(args.csv) as logger: + if args.source == "usb": + assert client is not None + with client, handler.open_csv_logger(args.csv) as logger: + try: + while True: + if plotter is not None and not plotter.is_open(): + return + + response = client.read_holding_registers( + slave_id=SLAVE_ID, + start_register=START_REG, + register_count=NUM_WORDS, + read_max_bytes=READ_MAX_BYTES, + exact_response_length=True, + ) + registers = handler.parse_read_holding_registers_response(response) + readings = handler.decode_readings(registers) + + row = {"timestamp": datetime.now(timezone.utc).isoformat()} + flat = handler.readings_to_flat_dict(readings) + row.update(flat) + logger.log(row) + + if plotter is not None: + plotter.update(time.monotonic() - t0, flat) + plotter.process_events() + + next_deadline += args.interval + sleep_for = next_deadline - time.monotonic() + if sleep_for > 0: + time.sleep(sleep_for) + except KeyboardInterrupt: + return + + mqtt_cfg = MqttConfig( + server=args.mqtt_server, + topic=args.mqtt_topic, + username=args.mqtt_username, + password=args.mqtt_password, + client_id=args.mqtt_client_id, + ) + + with MqttJsonSubscriber(mqtt_cfg) as sub, handler.open_csv_logger(args.csv) as logger: try: while True: if plotter is not None and not plotter.is_open(): return - response = client.read_holding_registers( - slave_id=SLAVE_ID, - start_register=START_REG, - register_count=NUM_WORDS, - read_max_bytes=READ_MAX_BYTES, - exact_response_length=True, - ) - registers = handler.parse_read_holding_registers_response(response) - readings = handler.decode_readings(registers) + payload = sub.get(timeout=args.interval) + if payload is None: + if plotter is not None: + plotter.process_events() + continue + readings = handler.decode_readings_from_mqtt_payload(payload) row = {"timestamp": datetime.now(timezone.utc).isoformat()} flat = handler.readings_to_flat_dict(readings) row.update(flat) @@ -74,14 +132,26 @@ def main() -> None: if plotter is not None: plotter.update(time.monotonic() - t0, flat) plotter.process_events() - - next_deadline += args.interval - sleep_for = next_deadline - time.monotonic() - if sleep_for > 0: - time.sleep(sleep_for) except KeyboardInterrupt: return + if args.source == "mqtt": + mqtt_cfg = MqttConfig( + server=args.mqtt_server, + topic=args.mqtt_topic, + username=args.mqtt_username, + password=args.mqtt_password, + client_id=args.mqtt_client_id, + ) + with MqttJsonSubscriber(mqtt_cfg) as sub: + payload = sub.get(timeout=max(15.0, args.timeout)) + if payload is None: + raise RuntimeError("No MQTT message received (timeout)") + readings = handler.decode_readings_from_mqtt_payload(payload) + handler.print_readings_json(readings) + return + + assert client is not None with client: response = client.read_holding_registers( slave_id=SLAVE_ID, diff --git a/hopi/data_handler.py b/hopi/data_handler.py index eae16ab..ddda42a 100644 --- a/hopi/data_handler.py +++ b/hopi/data_handler.py @@ -43,6 +43,63 @@ class ReadingsCsvLogger: class PowerMeterDataHandler: """Parses Modbus RTU responses and computes derived power metrics.""" + def decode_readings_from_mqtt_payload(self, payload: dict) -> PowerMeterReadings: + """Decode a JSON dict coming from MQTT into PowerMeterReadings. + + Accepts both this app's internal naming as well as the device's shorter keys. + Example device payload keys: + - annual_kwh, active_kwh, reactive_kwh, load_time_h, hours_day, device_addr + """ + + active_power = self._get_float(payload, "active_power_w") + rms_current = self._get_float(payload, "rms_current_a") + voltage = self._get_float(payload, "voltage_v") + frequency = self._get_float(payload, "frequency_hz") + power_factor = self._get_float(payload, "power_factor") + + annual_power_consumption = self._get_float( + payload, + "annual_power_consumption_kwh", + "annual_kwh", + ) + active_consumption = self._get_float( + payload, + "active_consumption_kwh", + "active_kwh", + ) + reactive_consumption = self._get_float( + payload, + "reactive_consumption_kwh", + "reactive_kwh", + ) + + load_time_hours = self._get_float(payload, "load_time_hours", "load_time_h") + work_hours_per_day = self._get_int(payload, "work_hours_per_day", "hours_day") + device_address = self._get_int(payload, "device_address", "device_addr") + + s_from_vi = self.apparent_from_vi(voltage, rms_current) + s_from_pf = self.apparent_from_pf(active_power, power_factor) + q_calculated = self.reactive_from_p_s(active_power, s_from_pf) + s_from_pq = self.apparent_from_p_q(active_consumption, reactive_consumption) + + return PowerMeterReadings( + active_power_w=active_power, + rms_current_a=rms_current, + voltage_v=voltage, + frequency_hz=frequency, + power_factor=power_factor, + annual_power_consumption_kwh=annual_power_consumption, + active_consumption_kwh=active_consumption, + reactive_consumption_kwh=reactive_consumption, + load_time_hours=load_time_hours, + work_hours_per_day=work_hours_per_day, + device_address=device_address, + apparent_power_vi_va=s_from_vi, + apparent_power_pf_va=s_from_pf, + reactive_power_var=q_calculated, + apparent_consumption_kvah=s_from_pq, + ) + def parse_read_holding_registers_response(self, response: bytes) -> List[int]: # Response layout: [slave][func][byte_count][data...][crc_lo][crc_hi] # We keep validation intentionally lightweight to match prior script behavior. @@ -196,3 +253,25 @@ class PowerMeterDataHandler: if active_kwh == 0 and reactive_kwh == 0: return 0.0 return math.sqrt(active_kwh**2 + reactive_kwh**2) + + @staticmethod + def _get_float(payload: dict, *keys: str, default: float = 0.0) -> float: + for key in keys: + if key in payload and payload[key] is not None: + value = payload[key] + try: + return float(value) + except (TypeError, ValueError): + return default + return default + + @staticmethod + def _get_int(payload: dict, *keys: str, default: int = 0) -> int: + for key in keys: + if key in payload and payload[key] is not None: + value = payload[key] + try: + return int(value) + except (TypeError, ValueError): + return default + return default diff --git a/hopi/mqtt_client.py b/hopi/mqtt_client.py new file mode 100644 index 0000000..314d83a --- /dev/null +++ b/hopi/mqtt_client.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import json +import queue +import threading +from dataclasses import dataclass +from typing import Any, Dict, Optional + + +@dataclass(frozen=True) +class MqttConfig: + server: str # host or host:port + topic: str + username: Optional[str] = None + password: Optional[str] = None + client_id: Optional[str] = None + keepalive: int = 60 + + +def _parse_server(server: str) -> tuple[str, int]: + server = server.strip() + if not server: + raise ValueError("MQTT server must not be empty") + + # Accept host or host:port. + if ":" in server: + host, port_s = server.rsplit(":", 1) + host = host.strip() + if not host: + raise ValueError("MQTT server host must not be empty") + try: + port = int(port_s) + except ValueError as exc: + raise ValueError("MQTT server port must be an integer") from exc + return host, port + + return server, 1883 + + +class MqttJsonSubscriber: + """Subscribes to a topic and yields decoded JSON payloads.""" + + def __init__(self, config: MqttConfig): + self._config = config + self._queue: queue.Queue[Dict[str, Any]] = queue.Queue(maxsize=1000) + self._client = None + + def __enter__(self) -> "MqttJsonSubscriber": + self.open() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + def open(self) -> None: + if self._client is not None: + return + + try: + import paho.mqtt.client as mqtt + except Exception as exc: # pragma: no cover + raise RuntimeError( + "MQTT requires paho-mqtt. Install it (pip install paho-mqtt) or run with --source usb." + ) from exc + + host, port = _parse_server(self._config.server) + + client = mqtt.Client(client_id=self._config.client_id) + if self._config.username is not None: + client.username_pw_set(self._config.username, self._config.password) + + connected = threading.Event() + connect_rc: dict[str, int] = {"rc": -1} + + def on_connect(_client, _userdata, _flags, rc): + connect_rc["rc"] = int(rc) + connected.set() + if rc != 0: + return + _client.subscribe(self._config.topic) + + def on_message(_client, _userdata, msg): + try: + payload = msg.payload.decode("utf-8", errors="strict") + data = json.loads(payload) + if isinstance(data, dict): + try: + self._queue.put_nowait(data) + except queue.Full: + # Drop oldest to keep latest values moving. + try: + _ = self._queue.get_nowait() + except queue.Empty: + pass + try: + self._queue.put_nowait(data) + except queue.Full: + pass + except Exception: + # Ignore invalid JSON messages. + return + + client.on_connect = on_connect + client.on_message = on_message + + client.connect(host, port, keepalive=self._config.keepalive) + client.loop_start() + + if not connected.wait(timeout=5.0): + client.loop_stop() + client.disconnect() + raise RuntimeError("MQTT connect timed out") + + if connect_rc["rc"] != 0: + client.loop_stop() + client.disconnect() + raise RuntimeError(f"MQTT connect failed (rc={connect_rc['rc']})") + + self._client = client + + def close(self) -> None: + client = self._client + self._client = None + if client is None: + return + try: + client.loop_stop() + finally: + try: + client.disconnect() + except Exception: + pass + + def get(self, *, timeout: Optional[float] = None) -> Optional[Dict[str, Any]]: + """Get the next JSON message dict, or None on timeout.""" + try: + return self._queue.get(timeout=timeout) + except queue.Empty: + return None diff --git a/requirements.txt b/requirements.txt index c6798ed..8846d0e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pyserial>=3.5 matplotlib>=3.5 +paho-mqtt>=1.6