Files

1010 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
"""
monitor.py — Linux system stats → MQTT with Home Assistant auto-discovery.
Mirrors the functionality of IOTLink / HASS.Agent on Windows.
Dependencies: paho-mqtt, psutil, pyyaml
Optional: nvidia-smi in PATH for GPU stats
"""
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
import psutil
import paho.mqtt.client as mqtt
import yaml
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
CONFIG_PATH = Path(__file__).parent / "config.yaml"
SCRIPT_VERSION = "1.0.0"
def load_config(path: Path = CONFIG_PATH) -> dict:
with open(path) as fh:
raw = yaml.safe_load(fh)
# Normalise / apply defaults so the rest of the script can access keys safely
cfg = raw or {}
cfg.setdefault("mqtt", {})
cfg.setdefault("monitor", {})
cfg.setdefault("sensors", {})
m = cfg["mqtt"]
m.setdefault("host", "localhost")
m.setdefault("port", 1883)
m.setdefault("username", "")
m.setdefault("password", "")
m.setdefault("topic_prefix", "pc_monitor")
m.setdefault("client_id", "")
mn = cfg["monitor"]
mn.setdefault("interval", 30)
mn.setdefault("device_name", "")
mn.setdefault("node_id", "")
s = cfg["sensors"]
s.setdefault("cpu", True)
s.setdefault("gpu", True)
s.setdefault("ram", True)
s.setdefault("disk", {})
if not isinstance(s["disk"], dict):
s["disk"] = {"enabled": bool(s["disk"])}
s["disk"].setdefault("enabled", True)
s["disk"].setdefault("partitions", [])
s["disk"].setdefault("io_stats", True)
s.setdefault("network", {})
if not isinstance(s["network"], dict):
s["network"] = {"enabled": bool(s["network"])}
s["network"].setdefault("enabled", True)
s["network"].setdefault("interfaces", [])
s.setdefault("system", True)
s.setdefault("idle_time", True)
return cfg
# ---------------------------------------------------------------------------
# Collectors
# ---------------------------------------------------------------------------
class CpuCollector:
"""CPU usage, frequency, temperature, load averages."""
# Ordered list of sensor keys to try for CPU package / aggregate temp
_TEMP_KEYS = ("coretemp", "k10temp", "zenpower", "acpitz", "cpu_thermal")
def collect(self) -> dict[str, Any]:
data: dict[str, Any] = {}
# Usage & frequency
data["cpu_usage"] = round(psutil.cpu_percent(interval=None), 1)
freq = psutil.cpu_freq()
data["cpu_freq"] = round(freq.current, 0) if freq else None
# Load averages
load = psutil.getloadavg()
data["load_1m"] = round(load[0], 2)
data["load_5m"] = round(load[1], 2)
data["load_15m"] = round(load[2], 2)
# Temperature
data["cpu_temp"] = self._get_temp()
return data
def _get_temp(self) -> float | None:
try:
temps = psutil.sensors_temperatures()
except AttributeError:
return None # platform doesn't support it
if not temps:
return None
for key in self._TEMP_KEYS:
if key in temps:
entries = temps[key]
if not entries:
continue
# Prefer "Package id 0" or "Tctl" (AMD); fall back to first entry
for entry in entries:
lbl = (entry.label or "").lower()
if "package" in lbl or "tctl" in lbl or "tdie" in lbl:
return round(entry.current, 1)
return round(entries[0].current, 1)
# Last resort: any sensor with "cpu" in the key name
for key, entries in temps.items():
if "cpu" in key.lower() and entries:
return round(entries[0].current, 1)
return None
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
prefix = hostname
return [
{
"key": "cpu_usage",
"name": "CPU Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:cpu-64-bit",
},
{
"key": "cpu_freq",
"name": "CPU Frequency",
"unit": "MHz",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:gauge",
},
{
"key": "cpu_temp",
"name": "CPU Temperature",
"unit": "°C",
"device_class": "temperature",
"state_class": "measurement",
"icon": None,
},
{
"key": "load_1m",
"name": "Load Average 1m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
{
"key": "load_5m",
"name": "Load Average 5m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
{
"key": "load_15m",
"name": "Load Average 15m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
]
class GpuCollector:
"""NVIDIA GPU stats via nvidia-smi subprocess. Fails gracefully."""
_QUERY = (
"utilization.gpu,temperature.gpu,memory.used,memory.total,power.draw"
)
_available: bool | None = None # cached after first attempt
def _nvidia_available(self) -> bool:
if self._available is None:
try:
subprocess.run(
["nvidia-smi", "-L"],
capture_output=True,
timeout=5,
check=True,
)
self._available = True
except Exception:
self._available = False
log.warning("nvidia-smi not available; GPU monitoring disabled.")
return self._available
def collect(self) -> dict[str, Any]:
empty: dict[str, Any] = {
"gpu_usage": None,
"gpu_temp": None,
"gpu_mem_used": None,
"gpu_mem_total": None,
"gpu_mem_percent": None,
"gpu_power": None,
}
if not self._nvidia_available():
return empty
try:
result = subprocess.run(
[
"nvidia-smi",
f"--query-gpu={self._QUERY}",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
return empty
parts = [p.strip() for p in result.stdout.strip().split(",")]
if len(parts) < 5:
return empty
gpu_usage = self._parse_float(parts[0])
gpu_temp = self._parse_float(parts[1])
mem_used = self._parse_float(parts[2])
mem_total = self._parse_float(parts[3])
power = self._parse_float(parts[4])
mem_pct = (
round(mem_used / mem_total * 100, 1)
if mem_used is not None and mem_total and mem_total > 0
else None
)
return {
"gpu_usage": gpu_usage,
"gpu_temp": gpu_temp,
"gpu_mem_used": mem_used,
"gpu_mem_total": mem_total,
"gpu_mem_percent": mem_pct,
"gpu_power": power,
}
except Exception as exc:
log.warning("GPU collection error: %s", exc)
return empty
@staticmethod
def _parse_float(val: str) -> float | None:
try:
return round(float(val), 1)
except (ValueError, TypeError):
return None
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "gpu_usage",
"name": "GPU Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:expansion-card",
},
{
"key": "gpu_temp",
"name": "GPU Temperature",
"unit": "°C",
"device_class": "temperature",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_used",
"name": "GPU Memory Used",
"unit": "MiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_total",
"name": "GPU Memory Total",
"unit": "MiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_percent",
"name": "GPU Memory Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:expansion-card",
},
{
"key": "gpu_power",
"name": "GPU Power Draw",
"unit": "W",
"device_class": "power",
"state_class": "measurement",
"icon": None,
},
]
class RamCollector:
"""RAM and swap usage."""
def collect(self) -> dict[str, Any]:
vm = psutil.virtual_memory()
sw = psutil.swap_memory()
return {
"ram_percent": round(vm.percent, 1),
"ram_used": round(vm.used / 2**30, 2),
"ram_available": round(vm.available / 2**30, 2),
"ram_total": round(vm.total / 2**30, 2),
"swap_percent": round(sw.percent, 1),
"swap_used": round(sw.used / 2**30, 2),
}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "ram_percent",
"name": "RAM Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:memory",
},
{
"key": "ram_used",
"name": "RAM Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "ram_available",
"name": "RAM Available",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "ram_total",
"name": "RAM Total",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "swap_percent",
"name": "Swap Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:harddisk",
},
{
"key": "swap_used",
"name": "Swap Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
]
def _mount_to_id(mountpoint: str) -> str:
"""Convert a mountpoint path to a safe sensor key suffix."""
if mountpoint == "/":
return "root"
safe = mountpoint.strip("/").replace("/", "_").replace("-", "_").replace(" ", "_")
return safe or "root"
class DiskCollector:
"""Disk usage per partition + aggregate I/O speed."""
def __init__(self, cfg: dict):
self._cfg = cfg
self._prev_io: psutil._common.sdiskio | None = None
self._prev_time: float = time.monotonic()
self._mounts: list[str] = [] # populated on first collect
def _get_mounts(self) -> list[str]:
whitelist = [m.strip() for m in (self._cfg.get("partitions") or []) if m.strip()]
partitions = []
for part in psutil.disk_partitions(all=False):
# Skip loop devices, tmpfs, and other virtual filesystems
if part.fstype in ("squashfs", "tmpfs", "devtmpfs", "overlay", ""):
continue
if "loop" in part.device:
continue
if whitelist and part.mountpoint not in whitelist:
continue
partitions.append(part.mountpoint)
return partitions
def collect(self) -> dict[str, Any]:
data: dict[str, Any] = {}
if not self._mounts:
self._mounts = self._get_mounts()
for mount in self._mounts:
mid = _mount_to_id(mount)
try:
usage = psutil.disk_usage(mount)
data[f"disk_{mid}_percent"] = round(usage.percent, 1)
data[f"disk_{mid}_used"] = round(usage.used / 2**30, 2)
data[f"disk_{mid}_free"] = round(usage.free / 2**30, 2)
data[f"disk_{mid}_total"] = round(usage.total / 2**30, 2)
except PermissionError:
pass
if self._cfg.get("io_stats", True):
now = time.monotonic()
elapsed = now - self._prev_time
try:
cur_io = psutil.disk_io_counters()
if self._prev_io is not None and elapsed > 0:
rb = (cur_io.read_bytes - self._prev_io.read_bytes) / elapsed
wb = (cur_io.write_bytes - self._prev_io.write_bytes) / elapsed
data["disk_read_speed"] = round(rb / 2**20, 2)
data["disk_write_speed"] = round(wb / 2**20, 2)
else:
data["disk_read_speed"] = 0.0
data["disk_write_speed"] = 0.0
self._prev_io = cur_io
except Exception:
data["disk_read_speed"] = None
data["disk_write_speed"] = None
self._prev_time = now
return data
def sensor_definitions(self) -> list[dict]:
"""Build sensor defs based on discovered mounts."""
if not self._mounts:
self._mounts = self._get_mounts()
defs = []
for mount in self._mounts:
mid = _mount_to_id(mount)
label = mount # human-readable label
defs += [
{
"key": f"disk_{mid}_percent",
"name": f"Disk {label} Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:harddisk",
},
{
"key": f"disk_{mid}_used",
"name": f"Disk {label} Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": f"disk_{mid}_free",
"name": f"Disk {label} Free",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": f"disk_{mid}_total",
"name": f"Disk {label} Total",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
]
if self._cfg.get("io_stats", True):
defs += [
{
"key": "disk_read_speed",
"name": "Disk Read Speed",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": None,
},
{
"key": "disk_write_speed",
"name": "Disk Write Speed",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": None,
},
]
return defs
class NetworkCollector:
"""Per-interface download / upload speed."""
def __init__(self, cfg: dict):
self._cfg = cfg
self._prev: dict[str, Any] = {}
self._prev_time: float = time.monotonic()
self._ifaces: list[str] = []
def _get_ifaces(self) -> list[str]:
whitelist = [i.strip() for i in (self._cfg.get("interfaces") or []) if i.strip()]
counters = psutil.net_io_counters(pernic=True)
ifaces = []
for iface in counters:
if iface == "lo":
continue
if whitelist and iface not in whitelist:
continue
ifaces.append(iface)
return ifaces
def collect(self) -> dict[str, Any]:
if not self._ifaces:
self._ifaces = self._get_ifaces()
data: dict[str, Any] = {}
now = time.monotonic()
elapsed = now - self._prev_time
counters = psutil.net_io_counters(pernic=True)
for iface in self._ifaces:
if iface not in counters:
continue
cur = counters[iface]
safe = iface.replace("-", "_").replace(".", "_")
if iface in self._prev and elapsed > 0:
rx = (cur.bytes_recv - self._prev[iface].bytes_recv) / elapsed
tx = (cur.bytes_sent - self._prev[iface].bytes_sent) / elapsed
data[f"net_{safe}_in"] = round(max(rx, 0) / 2**20, 3)
data[f"net_{safe}_out"] = round(max(tx, 0) / 2**20, 3)
else:
data[f"net_{safe}_in"] = 0.0
data[f"net_{safe}_out"] = 0.0
self._prev[iface] = cur
self._prev_time = now
return data
def sensor_definitions(self) -> list[dict]:
if not self._ifaces:
self._ifaces = self._get_ifaces()
defs = []
for iface in self._ifaces:
safe = iface.replace("-", "_").replace(".", "_")
defs += [
{
"key": f"net_{safe}_in",
"name": f"{iface} Download",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": "mdi:download-network",
},
{
"key": f"net_{safe}_out",
"name": f"{iface} Upload",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": "mdi:upload-network",
},
]
return defs
class SystemCollector:
"""Uptime, logged-in users."""
def collect(self) -> dict[str, Any]:
uptime_s = int(time.time() - psutil.boot_time())
users = psutil.users()
usernames = sorted({u.name for u in users}) if users else []
return {
"uptime": uptime_s,
"logged_in_users": ", ".join(usernames) if usernames else "none",
}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "uptime",
"name": "Uptime",
"unit": "s",
"device_class": "duration",
"state_class": "measurement",
"icon": None,
},
{
"key": "logged_in_users",
"name": "Logged In Users",
"unit": None,
"device_class": None,
"state_class": None,
"icon": "mdi:account-multiple",
},
]
class IdleTimeCollector:
"""Session idle time via xprintidle. Requires X11 and xprintidle in PATH."""
_available: bool | None = None # cached after first probe
def _check_available(self) -> bool:
if self._available is None:
try:
subprocess.run(
["xprintidle"],
capture_output=True,
timeout=3,
)
self._available = True
except FileNotFoundError:
self._available = False
log.warning(
"xprintidle not found; idle_time sensor disabled. "
"Install it (e.g. 'pacman -S xprintidle' or via AUR)."
)
except Exception as exc:
self._available = False
log.warning("xprintidle probe failed: %s", exc)
return self._available
def collect(self) -> dict[str, Any]:
if not os.environ.get("DISPLAY"):
log.debug("DISPLAY not set; skipping idle time collection.")
return {"session_idle": None}
if not self._check_available():
return {"session_idle": None}
try:
result = subprocess.run(
["xprintidle"],
capture_output=True,
text=True,
timeout=3,
)
if result.returncode == 0:
ms = int(result.stdout.strip())
return {"session_idle": round(ms / 1000, 1)}
except Exception as exc:
log.debug("xprintidle runtime error: %s", exc)
return {"session_idle": None}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "session_idle",
"name": "Session Idle Time",
"unit": "s",
"device_class": "duration",
"state_class": "measurement",
"icon": "mdi:timer-outline",
}
]
# ---------------------------------------------------------------------------
# Discovery publisher
# ---------------------------------------------------------------------------
class DiscoveryPublisher:
"""Builds and publishes HA MQTT auto-discovery config payloads."""
def __init__(
self,
client: mqtt.Client,
node_id: str,
device_name: str,
state_topic: str,
availability_topic: str,
discovery_prefix: str = "homeassistant",
):
self._client = client
self._node_id = node_id
self._device_name = device_name
self._state_topic = state_topic
self._availability_topic = availability_topic
self._discovery_prefix = discovery_prefix
def _device_block(self) -> dict:
return {
"identifiers": [f"{self._node_id}_monitor"],
"name": self._device_name,
"manufacturer": "monitor.py",
"model": "Linux System Monitor",
"sw_version": SCRIPT_VERSION,
}
def publish_all(self, sensor_defs: list[dict]) -> None:
for defn in sensor_defs:
self._publish_one(defn)
log.info("Published %d discovery configs.", len(sensor_defs))
def _publish_one(self, defn: dict) -> None:
key = defn["key"]
unique_id = f"{self._node_id}_{key}"
object_id = unique_id
payload: dict[str, Any] = {
"name": defn["name"],
"unique_id": unique_id,
"state_topic": self._state_topic,
"value_template": f"{{{{ value_json.{key} }}}}",
"availability_topic": self._availability_topic,
"device": self._device_block(),
}
if defn.get("unit"):
payload["unit_of_measurement"] = defn["unit"]
if defn.get("device_class"):
payload["device_class"] = defn["device_class"]
if defn.get("state_class"):
payload["state_class"] = defn["state_class"]
if defn.get("icon"):
payload["icon"] = defn["icon"]
discovery_topic = (
f"{self._discovery_prefix}/sensor/{self._node_id}/{object_id}/config"
)
self._client.publish(
discovery_topic,
json.dumps(payload),
qos=1,
retain=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def collect_all(collectors: list) -> dict[str, Any]:
data: dict[str, Any] = {}
for col in collectors:
try:
data.update(col.collect())
except Exception as exc:
log.warning("Collector %s failed: %s", type(col).__name__, exc)
return data
def all_sensor_defs(
collectors: list,
hostname: str,
) -> list[dict]:
defs = []
for col in collectors:
if hasattr(col, "sensor_definitions"):
sd = col.sensor_definitions
# Static methods accept hostname; instance methods don't need it
try:
result = sd(hostname)
except TypeError:
result = sd()
defs.extend(result)
return defs
# ---------------------------------------------------------------------------
# MQTT setup
# ---------------------------------------------------------------------------
def make_topics(prefix: str, hostname: str) -> tuple[str, str]:
"""Return (state_topic, availability_topic)."""
base = f"{prefix}/{hostname}"
return f"{base}/state", f"{base}/availability"
def build_client(cfg: dict, hostname: str) -> mqtt.Client:
mqtt_cfg = cfg["mqtt"]
client_id = mqtt_cfg["client_id"] or f"pc_monitor_{hostname}"
client = mqtt.Client(client_id=client_id)
if mqtt_cfg["username"]:
client.username_pw_set(mqtt_cfg["username"], mqtt_cfg["password"] or None)
_, availability_topic = make_topics(mqtt_cfg["topic_prefix"], hostname)
client.will_set(availability_topic, payload="offline", qos=1, retain=True)
return client
# ---------------------------------------------------------------------------
# Main application
# ---------------------------------------------------------------------------
class MonitorApp:
def __init__(self, cfg: dict):
self._cfg = cfg
self._hostname = socket.gethostname()
self._device_name = cfg["monitor"].get("device_name") or self._hostname
raw_node_id = cfg["monitor"].get("node_id", "").strip()
self._node_id = self._make_node_id(raw_node_id or self._hostname)
self._interval = max(5, int(cfg["monitor"].get("interval", 30)))
prefix = cfg["mqtt"]["topic_prefix"]
self._state_topic, self._avail_topic = make_topics(prefix, self._node_id)
self._collectors = self._build_collectors()
self._sensor_defs = all_sensor_defs(self._collectors, self._hostname)
self._client = build_client(cfg, self._node_id)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
self._discovery = DiscoveryPublisher(
self._client,
self._node_id,
self._device_name,
self._state_topic,
self._avail_topic,
)
self._running = True
# Warm up cpu_percent (first call always returns 0.0)
psutil.cpu_percent(interval=None)
@staticmethod
def _make_node_id(name: str) -> str:
"""Sanitize a string for use in MQTT topics and HA unique_ids."""
safe = "".join(c if c.isalnum() or c in "-_" else "-" for c in name.lower())
# Collapse consecutive hyphens and strip leading/trailing separators
while "--" in safe:
safe = safe.replace("--", "-")
return safe.strip("-_") or "monitor"
def _build_collectors(self) -> list:
s = self._cfg["sensors"]
collectors = []
if s.get("cpu", True):
collectors.append(CpuCollector())
if s.get("gpu", True):
collectors.append(GpuCollector())
if s.get("ram", True):
collectors.append(RamCollector())
if s.get("disk", {}).get("enabled", True) if isinstance(s.get("disk"), dict) else s.get("disk", True):
collectors.append(DiskCollector(s["disk"] if isinstance(s.get("disk"), dict) else {}))
if s.get("network", {}).get("enabled", True) if isinstance(s.get("network"), dict) else s.get("network", True):
collectors.append(NetworkCollector(s["network"] if isinstance(s.get("network"), dict) else {}))
if s.get("system", True):
collectors.append(SystemCollector())
if s.get("idle_time", True):
collectors.append(IdleTimeCollector())
return collectors
# ------------------------------------------------------------------
# MQTT callbacks
# ------------------------------------------------------------------
def _on_connect(self, client, userdata, flags, rc):
if rc != 0:
log.error("MQTT connection failed with code %d", rc)
return
log.info("Connected to MQTT broker.")
client.publish(self._avail_topic, "online", qos=1, retain=True)
self._discovery.publish_all(self._sensor_defs)
self._publish_state()
# Subscribe to HA birth messages so we can resend discovery after HA restart
client.subscribe("homeassistant/status", qos=1)
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
log.warning("Unexpected MQTT disconnect (rc=%d). Will auto-reconnect.", rc)
def _on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode()
except Exception:
return
if msg.topic == "homeassistant/status" and payload == "online":
log.info("Home Assistant came online; resending discovery configs.")
self._discovery.publish_all(self._sensor_defs)
self._publish_state()
# ------------------------------------------------------------------
# State publishing
# ------------------------------------------------------------------
def _publish_state(self):
data = collect_all(self._collectors)
self._client.publish(
self._state_topic,
json.dumps(data),
qos=1,
retain=False,
)
log.debug("State published: %d keys", len(data))
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self):
mqtt_cfg = self._cfg["mqtt"]
log.info(
"Connecting to %s:%d as device '%s' (interval=%ds)...",
mqtt_cfg["host"],
mqtt_cfg["port"],
self._device_name,
self._interval,
)
self._client.connect(
mqtt_cfg["host"],
port=mqtt_cfg["port"],
keepalive=60,
)
self._client.loop_start()
def stop(self):
log.info("Shutting down — publishing offline status.")
self._running = False
try:
self._client.publish(self._avail_topic, "offline", qos=1, retain=True)
time.sleep(0.5) # give broker time to process
finally:
self._client.loop_stop()
self._client.disconnect()
def run(self):
self.start()
try:
while self._running:
loop_start = time.monotonic()
self._publish_state()
elapsed = time.monotonic() - loop_start
sleep_time = max(0.0, self._interval - elapsed)
time.sleep(sleep_time)
finally:
self.stop()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
# Allow an optional path argument for the config file
config_path = Path(sys.argv[1]) if len(sys.argv) > 1 else CONFIG_PATH
if not config_path.exists():
log.error("Config file not found: %s", config_path)
sys.exit(1)
cfg = load_config(config_path)
app = MonitorApp(cfg)
def _shutdown(signum, frame):
log.info("Signal %d received.", signum)
app.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
app.run()
if __name__ == "__main__":
main()