278 lines
11 KiB
Python
278 lines
11 KiB
Python
import csv
|
|
import json
|
|
import os
|
|
import math
|
|
import struct
|
|
from typing import List, Optional
|
|
|
|
from .models import PowerMeterReadings
|
|
|
|
|
|
class ReadingsCsvLogger:
|
|
def __init__(self, file_path: str, fieldnames: List[str]):
|
|
self._file_path = file_path
|
|
self._fieldnames = fieldnames
|
|
self._fp = None
|
|
self._writer: Optional[csv.DictWriter] = None
|
|
|
|
def __enter__(self) -> "ReadingsCsvLogger":
|
|
is_new_file = (not os.path.exists(self._file_path)) or os.path.getsize(self._file_path) == 0
|
|
self._fp = open(self._file_path, "a", newline="", encoding="utf-8")
|
|
self._writer = csv.DictWriter(self._fp, fieldnames=self._fieldnames)
|
|
if is_new_file:
|
|
self._writer.writeheader()
|
|
self._fp.flush()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
if self._fp is not None:
|
|
try:
|
|
self._fp.flush()
|
|
finally:
|
|
self._fp.close()
|
|
self._fp = None
|
|
self._writer = None
|
|
|
|
def log(self, row: dict) -> None:
|
|
if self._writer is None or self._fp is None:
|
|
raise RuntimeError("CSV logger is not open")
|
|
self._writer.writerow(row)
|
|
self._fp.flush()
|
|
|
|
|
|
class PowerMeterDataHandler:
|
|
"""Parses Modbus RTU responses and computes derived power metrics."""
|
|
|
|
def decode_readings_from_mqtt_payload(self, payload: dict) -> PowerMeterReadings:
|
|
"""Decode a JSON dict coming from MQTT into PowerMeterReadings.
|
|
|
|
Accepts both this app's internal naming as well as the device's shorter keys.
|
|
Example device payload keys:
|
|
- annual_kwh, active_kwh, reactive_kwh, load_time_h, hours_day, device_addr
|
|
"""
|
|
|
|
active_power = self._get_float(payload, "active_power_w")
|
|
rms_current = self._get_float(payload, "rms_current_a")
|
|
voltage = self._get_float(payload, "voltage_v")
|
|
frequency = self._get_float(payload, "frequency_hz")
|
|
power_factor = self._get_float(payload, "power_factor")
|
|
|
|
annual_power_consumption = self._get_float(
|
|
payload,
|
|
"annual_power_consumption_kwh",
|
|
"annual_kwh",
|
|
)
|
|
active_consumption = self._get_float(
|
|
payload,
|
|
"active_consumption_kwh",
|
|
"active_kwh",
|
|
)
|
|
reactive_consumption = self._get_float(
|
|
payload,
|
|
"reactive_consumption_kwh",
|
|
"reactive_kwh",
|
|
)
|
|
|
|
load_time_hours = self._get_float(payload, "load_time_hours", "load_time_h")
|
|
work_hours_per_day = self._get_int(payload, "work_hours_per_day", "hours_day")
|
|
device_address = self._get_int(payload, "device_address", "device_addr")
|
|
|
|
s_from_vi = self.apparent_from_vi(voltage, rms_current)
|
|
s_from_pf = self.apparent_from_pf(active_power, power_factor)
|
|
q_calculated = self.reactive_from_p_s(active_power, s_from_pf)
|
|
s_from_pq = self.apparent_from_p_q(active_consumption, reactive_consumption)
|
|
|
|
return PowerMeterReadings(
|
|
active_power_w=active_power,
|
|
rms_current_a=rms_current,
|
|
voltage_v=voltage,
|
|
frequency_hz=frequency,
|
|
power_factor=power_factor,
|
|
annual_power_consumption_kwh=annual_power_consumption,
|
|
active_consumption_kwh=active_consumption,
|
|
reactive_consumption_kwh=reactive_consumption,
|
|
load_time_hours=load_time_hours,
|
|
work_hours_per_day=work_hours_per_day,
|
|
device_address=device_address,
|
|
apparent_power_vi_va=s_from_vi,
|
|
apparent_power_pf_va=s_from_pf,
|
|
reactive_power_var=q_calculated,
|
|
apparent_consumption_kvah=s_from_pq,
|
|
)
|
|
|
|
def parse_read_holding_registers_response(self, response: bytes) -> List[int]:
|
|
# Response layout: [slave][func][byte_count][data...][crc_lo][crc_hi]
|
|
# We keep validation intentionally lightweight to match prior script behavior.
|
|
if len(response) < 5:
|
|
raise RuntimeError("No valid response")
|
|
|
|
byte_count = response[2]
|
|
data_start = 3
|
|
data_end = data_start + byte_count
|
|
if len(response) < data_end:
|
|
raise RuntimeError("Truncated response")
|
|
|
|
data = response[data_start:data_end]
|
|
if len(data) % 2 != 0:
|
|
raise RuntimeError("Invalid byte_count (must be even)")
|
|
|
|
return self._bytes_to_registers_be(data)
|
|
|
|
def decode_readings(self, registers: List[int]) -> PowerMeterReadings:
|
|
active_power = self.float_dcba(registers, 0)
|
|
rms_current = self.float_dcba(registers, 2)
|
|
voltage = self.float_dcba(registers, 4)
|
|
frequency = self.float_dcba(registers, 6)
|
|
power_factor = self.float_dcba(registers, 8)
|
|
annual_power_consumption = self.float_dcba(registers, 10)
|
|
active_consumption = self.float_dcba(registers, 12)
|
|
reactive_consumption = self.float_dcba(registers, 14)
|
|
load_time_hours = self.float_dcba(registers, 16) / 60.0
|
|
work_hours_per_day = int(registers[18])
|
|
device_address = int(registers[19])
|
|
|
|
s_from_vi = self.apparent_from_vi(voltage, rms_current)
|
|
s_from_pf = self.apparent_from_pf(active_power, power_factor)
|
|
q_calculated = self.reactive_from_p_s(active_power, s_from_pf)
|
|
s_from_pq = self.apparent_from_p_q(active_consumption, reactive_consumption)
|
|
|
|
return PowerMeterReadings(
|
|
active_power_w=active_power,
|
|
rms_current_a=rms_current,
|
|
voltage_v=voltage,
|
|
frequency_hz=frequency,
|
|
power_factor=power_factor,
|
|
annual_power_consumption_kwh=annual_power_consumption,
|
|
active_consumption_kwh=active_consumption,
|
|
reactive_consumption_kwh=reactive_consumption,
|
|
load_time_hours=load_time_hours,
|
|
work_hours_per_day=work_hours_per_day,
|
|
device_address=device_address,
|
|
apparent_power_vi_va=s_from_vi,
|
|
apparent_power_pf_va=s_from_pf,
|
|
reactive_power_var=q_calculated,
|
|
apparent_consumption_kvah=s_from_pq,
|
|
)
|
|
|
|
def readings_to_flat_dict(self, readings: PowerMeterReadings) -> dict:
|
|
# dataclasses.asdict() is fine, but we keep this local to the data layer
|
|
# so the entrypoint stays minimal.
|
|
return {
|
|
"active_power_w": readings.active_power_w,
|
|
"rms_current_a": readings.rms_current_a,
|
|
"voltage_v": readings.voltage_v,
|
|
"frequency_hz": readings.frequency_hz,
|
|
"power_factor": readings.power_factor,
|
|
"annual_power_consumption_kwh": readings.annual_power_consumption_kwh,
|
|
"active_consumption_kwh": readings.active_consumption_kwh,
|
|
"reactive_consumption_kwh": readings.reactive_consumption_kwh,
|
|
"load_time_hours": readings.load_time_hours,
|
|
"work_hours_per_day": readings.work_hours_per_day,
|
|
"device_address": readings.device_address,
|
|
"apparent_power_vi_va": readings.apparent_power_vi_va,
|
|
"apparent_power_pf_va": readings.apparent_power_pf_va,
|
|
"reactive_power_var": readings.reactive_power_var,
|
|
"apparent_consumption_kvah": readings.apparent_consumption_kvah,
|
|
}
|
|
|
|
def readings_to_json(self, readings: PowerMeterReadings, *, ensure_ascii: bool = False) -> str:
|
|
return json.dumps(self.readings_to_flat_dict(readings), ensure_ascii=ensure_ascii)
|
|
|
|
def print_readings_json(self, readings: PowerMeterReadings) -> None:
|
|
print(self.readings_to_json(readings, ensure_ascii=False))
|
|
|
|
def open_csv_logger(self, file_path: str) -> ReadingsCsvLogger:
|
|
fieldnames = ["timestamp"] + self.flat_readings_keys()
|
|
return ReadingsCsvLogger(file_path=file_path, fieldnames=fieldnames)
|
|
|
|
def flat_readings_keys(self) -> List[str]:
|
|
return self._flat_readings_keys()
|
|
|
|
@staticmethod
|
|
def _flat_readings_keys() -> List[str]:
|
|
# Keep order stable for CSV columns.
|
|
return [
|
|
"active_power_w",
|
|
"rms_current_a",
|
|
"voltage_v",
|
|
"frequency_hz",
|
|
"power_factor",
|
|
"annual_power_consumption_kwh",
|
|
"active_consumption_kwh",
|
|
"reactive_consumption_kwh",
|
|
"load_time_hours",
|
|
"work_hours_per_day",
|
|
"device_address",
|
|
"apparent_power_vi_va",
|
|
"apparent_power_pf_va",
|
|
"reactive_power_var",
|
|
"apparent_consumption_kvah",
|
|
]
|
|
|
|
@staticmethod
|
|
def _bytes_to_registers_be(data: bytes) -> List[int]:
|
|
registers: List[int] = []
|
|
for i in range(0, len(data), 2):
|
|
registers.append((data[i] << 8) | data[i + 1])
|
|
return registers
|
|
|
|
@staticmethod
|
|
def float_dcba(regs: List[int], index: int) -> float:
|
|
w1 = regs[index]
|
|
w2 = regs[index + 1]
|
|
b = bytes(
|
|
[
|
|
(w2 & 0xFF),
|
|
(w2 >> 8) & 0xFF,
|
|
(w1 & 0xFF),
|
|
(w1 >> 8) & 0xFF,
|
|
]
|
|
)
|
|
return struct.unpack(">f", b)[0]
|
|
|
|
@staticmethod
|
|
def apparent_from_vi(voltage_v: float, current_a: float) -> float:
|
|
if voltage_v == 0 or current_a == 0:
|
|
return 0.0
|
|
return voltage_v * current_a
|
|
|
|
@staticmethod
|
|
def apparent_from_pf(active_power_w: float, power_factor: float) -> float:
|
|
if power_factor == 0:
|
|
return 0.0
|
|
return active_power_w / power_factor
|
|
|
|
@staticmethod
|
|
def reactive_from_p_s(active_power_w: float, apparent_power_va: float) -> float:
|
|
if apparent_power_va < active_power_w:
|
|
return 0.0
|
|
return math.sqrt(apparent_power_va**2 - active_power_w**2)
|
|
|
|
@staticmethod
|
|
def apparent_from_p_q(active_kwh: float, reactive_kwh: float) -> float:
|
|
if active_kwh == 0 and reactive_kwh == 0:
|
|
return 0.0
|
|
return math.sqrt(active_kwh**2 + reactive_kwh**2)
|
|
|
|
@staticmethod
|
|
def _get_float(payload: dict, *keys: str, default: float = 0.0) -> float:
|
|
for key in keys:
|
|
if key in payload and payload[key] is not None:
|
|
value = payload[key]
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
return default
|
|
|
|
@staticmethod
|
|
def _get_int(payload: dict, *keys: str, default: int = 0) -> int:
|
|
for key in keys:
|
|
if key in payload and payload[key] is not None:
|
|
value = payload[key]
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
return default
|