import serial import struct import math PORT = "/dev/ttyUSB0" BAUD = 9600 SLAVE_ID = 1 START_REG = 0 NUM_WORDS = 20 # --- build Modbus RTU request manually --- # 01 03 00 00 00 14 CRC_LO CRC_HI request = bytes([ 0x01, # slave 0x03, # function 0x00, 0x00, # start register 0x00, 0x14, # number of registers (20) 0x45, 0xC5 # CRC (we don't care if it's right here) ]) # --- open serial port --- ser = serial.Serial( port=PORT, baudrate=BAUD, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.1 # read timeout (seconds) ) # flush buffers ser.reset_input_buffer() ser.reset_output_buffer() # send request ser.write(request) ser.flush() # small delay to allow response to arrive #time.sleep(0.1) # read whatever is available response = ser.read(64) ser.close() #print("Raw response bytes:") #print(" ".join(f"{b:02X}" for b in response)) # --- basic sanity check --- if len(response) < 3: raise RuntimeError("No valid response") slave = response[0] func = response[1] byte_count = response[2] # data bytes start at index 3 data = response[3:3 + byte_count] # convert data to 16-bit registers (big endian) registers = [] for i in range(0, len(data), 2): registers.append((data[i] << 8) | data[i+1]) def float_dcba(regs, index): w1 = regs[index] w2 = regs[index + 1] b = bytes([ (w2 & 0xFF), (w2 >> 8), (w1 & 0xFF), (w1 >> 8) ]) return struct.unpack(">f", b)[0] def apparent_from_VI(V, I): """ Calculate apparent power (S) from voltage and current. Handles zero current edge case. """ if V == 0 or I == 0: return 0.0 return V * I def apparent_from_PF(P, PF): """ Calculate apparent power (S) from active power and power factor. Handles zero or near-zero PF edge case. """ if PF == 0: return 0.0 return P / PF def reactive_from_P_S(P, S): """ Calculate reactive power (Q) from active power and apparent power. Handles S smaller than P (possible due to rounding errors). """ if S < P: return 0.0 return math.sqrt(S**2 - P**2) def apparent_from_P_Q(P, Q): """ Calculate apparent power (S) from active and reactive power. Handles edge case where both P and Q are zero. """ if P == 0 and Q == 0: return 0.0 return math.sqrt(P**2 + Q**2) active_power = float_dcba(registers, 0) # W (float) rms_current = float_dcba(registers, 2) # A (float) voltage = float_dcba(registers, 4) # V (float) frequency = float_dcba(registers, 6) # Hz (float) power_factor = float_dcba(registers, 8) # pf (float) annual_power_consumption = float_dcba(registers, 10) # KWH (float) active_consumption = float_dcba(registers, 12) # KWH (float) reactive_consumption = float_dcba(registers, 14) # KWH (float) load_time_hours = float_dcba(registers, 16) / 60.0 # Hrs (float) work_hours_per_day = int(registers[18]) # Hrs (int) device_address = int(registers[19]) # Device Address (int) S_from_VI = apparent_from_VI(voltage, rms_current) S_from_PF = apparent_from_PF(active_power, power_factor) Q_calculated = reactive_from_P_S(active_power, S_from_PF) S_from_PQ = apparent_from_P_Q(active_consumption, reactive_consumption) # Now print using the stored variables print(f"{active_power:10.5f} W Active Power") print(f"{S_from_VI:10.5f} VA Apparent Power (V*I)") print(f"{S_from_PF:10.5f} VA Apparent Power (P/PF)") print(f"{Q_calculated:10.5f} VAR Reactive Power (from P & S)") print(f"{rms_current:10.5f} A RMS Current") print(f"{voltage:10.5f} V Voltage") print(f"{frequency:10.5f} Hz Frequency") print(f"{power_factor:10.5f} pf Power Factor") print(f"{annual_power_consumption:10.5f} KWH Annual Power Consumption") print(f"{active_consumption:10.5f} KWH Active Consumption") print(f"{reactive_consumption:10.5f} KWH Reactive Consumption") print(f"{S_from_PQ:10.5f} KVAh Apparent Power (from P & Q consumption)") print(f"{load_time_hours:10.5f} Hrs Load Time") print(f"{work_hours_per_day:10d} Hrs Work Hours per Day") print(f"{device_address:10d} Device Address")