from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Optional @dataclass class PlotConfig: max_points: int = 300 class RealtimePlotter: """Very small matplotlib-based realtime plotter with per-series toggles.""" def __init__(self, series_names: List[str], *, config: Optional[PlotConfig] = None): self._series_names = list(series_names) self._config = config or PlotConfig() self._t: List[float] = [] self._y: Dict[str, List[float]] = {name: [] for name in self._series_names} self._plt = None self._fig = None self._ax = None self._lines = {} self._check = None def open(self) -> None: try: import matplotlib.pyplot as plt from matplotlib.widgets import CheckButtons except Exception as exc: # pragma: no cover raise RuntimeError( "GUI requires matplotlib. Install it (pip install matplotlib) or run without --gui." ) from exc self._plt = plt self._fig, self._ax = plt.subplots() self._fig.canvas.manager.set_window_title("Hopi – Realtime Readings") # Leave space on the right for checkboxes. self._fig.subplots_adjust(right=0.78) for name in self._series_names: (line,) = self._ax.plot([], [], label=name, linewidth=1) self._lines[name] = line self._ax.set_xlabel("t (s)") self._ax.set_ylabel("value") self._ax.grid(True, which="both", linewidth=0.5) # Checkboxes rax = self._fig.add_axes([0.80, 0.10, 0.19, 0.85]) visibility = [True] * len(self._series_names) self._check = CheckButtons(rax, self._series_names, visibility) def _toggle(label: str) -> None: line = self._lines.get(label) if line is None: return line.set_visible(not line.get_visible()) self._fig.canvas.draw_idle() self._check.on_clicked(_toggle) # Show legend for currently visible lines only; keep it minimal. self._ax.legend(loc="upper left", fontsize="small") plt.show(block=False) def is_open(self) -> bool: if self._plt is None or self._fig is None: return False return bool(self._plt.fignum_exists(self._fig.number)) def update(self, t_seconds: float, readings: Dict[str, float]) -> None: if not self.is_open() or self._ax is None: return self._t.append(t_seconds) for name in self._series_names: self._y[name].append(float(readings.get(name, 0.0))) # Rolling window if len(self._t) > self._config.max_points: self._t = self._t[-self._config.max_points :] for name in self._series_names: self._y[name] = self._y[name][-self._config.max_points :] for name in self._series_names: line = self._lines.get(name) if line is not None: line.set_data(self._t, self._y[name]) self._ax.relim() self._ax.autoscale_view() def process_events(self) -> None: if self._plt is None: return # Allow the GUI event loop to run. self._plt.pause(0.001)