Add files

This commit is contained in:
2026-05-04 02:29:40 -04:00
parent 4e03ee75ab
commit 7be6e78586
5 changed files with 1299 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
# Basic .gitattributes for a python repo.
# Source files
# ============
*.pxd text diff=python
*.py text diff=python
*.py3 text diff=python
*.pyw text diff=python
*.pyx text diff=python
*.pyz text diff=python
*.pyi text diff=python
# Binary files
# ============
*.db binary
*.p binary
*.pkl binary
*.pickle binary
*.pyc binary export-ignore
*.pyo binary export-ignore
*.pyd binary
# Jupyter notebook
*.ipynb text eol=lf
# Note: .db, .p, and .pkl files are associated
# with the python modules ``pickle``, ``dbm.*``,
# ``shelve``, ``marshal``, ``anydbm``, & ``bsddb``
# (among others).
+224
View File
@@ -0,0 +1,224 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
*.lcov
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi/*
!.pixi/config.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule*
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Temporary file for partial code execution
tempCodeRunnerFile.py
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
# Ignore monitor.service and config.yaml since they contain local configuration that shouldn't be shared.
monitor.service
config.yaml
+25
View File
@@ -0,0 +1,25 @@
mqtt:
host: "192.168.1.251" # MQTT broker address
port: 1883
username: "mosquitto" # leave empty if broker has no auth
password: "stratton94"
topic_prefix: "test" # base topic: {topic_prefix}/{hostname}/...
client_id: "" # leave empty to auto-generate from hostname
monitor:
interval: 15 # seconds between stat publishes
device_name: "" # display name in HA; defaults to hostname
sensors:
cpu: true
gpu: true # NVIDIA via nvidia-smi; skipped gracefully if unavailable
ram: true
disk:
enabled: true
partitions: ["/","/boot","/mnt/c","/mnt/e","/mnt/s","/mnt/v"] # empty = auto-detect all physical, non-loop mounts
io_stats: true # enable read/write speed sensors
network:
enabled: true
interfaces: ["enp6s0","wlan0"] # empty = all non-loopback interfaces
system: true # uptime, logged-in users
idle_time: true # session idle via xprintidle (requires X11 + xprintidle in PATH)
+997
View File
@@ -0,0 +1,997 @@
#!/usr/bin/env python3
"""
monitor.py Linux system stats MQTT with Home Assistant auto-discovery.
Mirrors the functionality of IOTLink / HASS.Agent on Windows.
Dependencies: paho-mqtt, psutil, pyyaml
Optional: nvidia-smi in PATH for GPU stats
"""
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
import psutil
import paho.mqtt.client as mqtt
import yaml
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
CONFIG_PATH = Path(__file__).parent / "config.yaml"
SCRIPT_VERSION = "1.0.0"
def load_config(path: Path = CONFIG_PATH) -> dict:
with open(path) as fh:
raw = yaml.safe_load(fh)
# Normalise / apply defaults so the rest of the script can access keys safely
cfg = raw or {}
cfg.setdefault("mqtt", {})
cfg.setdefault("monitor", {})
cfg.setdefault("sensors", {})
m = cfg["mqtt"]
m.setdefault("host", "localhost")
m.setdefault("port", 1883)
m.setdefault("username", "")
m.setdefault("password", "")
m.setdefault("topic_prefix", "pc_monitor")
m.setdefault("client_id", "")
mn = cfg["monitor"]
mn.setdefault("interval", 30)
mn.setdefault("device_name", "")
s = cfg["sensors"]
s.setdefault("cpu", True)
s.setdefault("gpu", True)
s.setdefault("ram", True)
s.setdefault("disk", {})
if not isinstance(s["disk"], dict):
s["disk"] = {"enabled": bool(s["disk"])}
s["disk"].setdefault("enabled", True)
s["disk"].setdefault("partitions", [])
s["disk"].setdefault("io_stats", True)
s.setdefault("network", {})
if not isinstance(s["network"], dict):
s["network"] = {"enabled": bool(s["network"])}
s["network"].setdefault("enabled", True)
s["network"].setdefault("interfaces", [])
s.setdefault("system", True)
s.setdefault("idle_time", True)
return cfg
# ---------------------------------------------------------------------------
# Collectors
# ---------------------------------------------------------------------------
class CpuCollector:
"""CPU usage, frequency, temperature, load averages."""
# Ordered list of sensor keys to try for CPU package / aggregate temp
_TEMP_KEYS = ("coretemp", "k10temp", "zenpower", "acpitz", "cpu_thermal")
def collect(self) -> dict[str, Any]:
data: dict[str, Any] = {}
# Usage & frequency
data["cpu_usage"] = round(psutil.cpu_percent(interval=None), 1)
freq = psutil.cpu_freq()
data["cpu_freq"] = round(freq.current, 0) if freq else None
# Load averages
load = psutil.getloadavg()
data["load_1m"] = round(load[0], 2)
data["load_5m"] = round(load[1], 2)
data["load_15m"] = round(load[2], 2)
# Temperature
data["cpu_temp"] = self._get_temp()
return data
def _get_temp(self) -> float | None:
try:
temps = psutil.sensors_temperatures()
except AttributeError:
return None # platform doesn't support it
if not temps:
return None
for key in self._TEMP_KEYS:
if key in temps:
entries = temps[key]
if not entries:
continue
# Prefer "Package id 0" or "Tctl" (AMD); fall back to first entry
for entry in entries:
lbl = (entry.label or "").lower()
if "package" in lbl or "tctl" in lbl or "tdie" in lbl:
return round(entry.current, 1)
return round(entries[0].current, 1)
# Last resort: any sensor with "cpu" in the key name
for key, entries in temps.items():
if "cpu" in key.lower() and entries:
return round(entries[0].current, 1)
return None
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
prefix = hostname
return [
{
"key": "cpu_usage",
"name": "CPU Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:cpu-64-bit",
},
{
"key": "cpu_freq",
"name": "CPU Frequency",
"unit": "MHz",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:gauge",
},
{
"key": "cpu_temp",
"name": "CPU Temperature",
"unit": "°C",
"device_class": "temperature",
"state_class": "measurement",
"icon": None,
},
{
"key": "load_1m",
"name": "Load Average 1m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
{
"key": "load_5m",
"name": "Load Average 5m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
{
"key": "load_15m",
"name": "Load Average 15m",
"unit": None,
"device_class": None,
"state_class": "measurement",
"icon": "mdi:chart-line",
},
]
class GpuCollector:
"""NVIDIA GPU stats via nvidia-smi subprocess. Fails gracefully."""
_QUERY = (
"utilization.gpu,temperature.gpu,memory.used,memory.total,power.draw"
)
_available: bool | None = None # cached after first attempt
def _nvidia_available(self) -> bool:
if self._available is None:
try:
subprocess.run(
["nvidia-smi", "-L"],
capture_output=True,
timeout=5,
check=True,
)
self._available = True
except Exception:
self._available = False
log.warning("nvidia-smi not available; GPU monitoring disabled.")
return self._available
def collect(self) -> dict[str, Any]:
empty: dict[str, Any] = {
"gpu_usage": None,
"gpu_temp": None,
"gpu_mem_used": None,
"gpu_mem_total": None,
"gpu_mem_percent": None,
"gpu_power": None,
}
if not self._nvidia_available():
return empty
try:
result = subprocess.run(
[
"nvidia-smi",
f"--query-gpu={self._QUERY}",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
return empty
parts = [p.strip() for p in result.stdout.strip().split(",")]
if len(parts) < 5:
return empty
gpu_usage = self._parse_float(parts[0])
gpu_temp = self._parse_float(parts[1])
mem_used = self._parse_float(parts[2])
mem_total = self._parse_float(parts[3])
power = self._parse_float(parts[4])
mem_pct = (
round(mem_used / mem_total * 100, 1)
if mem_used is not None and mem_total and mem_total > 0
else None
)
return {
"gpu_usage": gpu_usage,
"gpu_temp": gpu_temp,
"gpu_mem_used": mem_used,
"gpu_mem_total": mem_total,
"gpu_mem_percent": mem_pct,
"gpu_power": power,
}
except Exception as exc:
log.warning("GPU collection error: %s", exc)
return empty
@staticmethod
def _parse_float(val: str) -> float | None:
try:
return round(float(val), 1)
except (ValueError, TypeError):
return None
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "gpu_usage",
"name": "GPU Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:expansion-card",
},
{
"key": "gpu_temp",
"name": "GPU Temperature",
"unit": "°C",
"device_class": "temperature",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_used",
"name": "GPU Memory Used",
"unit": "MiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_total",
"name": "GPU Memory Total",
"unit": "MiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "gpu_mem_percent",
"name": "GPU Memory Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:expansion-card",
},
{
"key": "gpu_power",
"name": "GPU Power Draw",
"unit": "W",
"device_class": "power",
"state_class": "measurement",
"icon": None,
},
]
class RamCollector:
"""RAM and swap usage."""
def collect(self) -> dict[str, Any]:
vm = psutil.virtual_memory()
sw = psutil.swap_memory()
return {
"ram_percent": round(vm.percent, 1),
"ram_used": round(vm.used / 2**30, 2),
"ram_available": round(vm.available / 2**30, 2),
"ram_total": round(vm.total / 2**30, 2),
"swap_percent": round(sw.percent, 1),
"swap_used": round(sw.used / 2**30, 2),
}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "ram_percent",
"name": "RAM Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:memory",
},
{
"key": "ram_used",
"name": "RAM Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "ram_available",
"name": "RAM Available",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "ram_total",
"name": "RAM Total",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": "swap_percent",
"name": "Swap Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:harddisk",
},
{
"key": "swap_used",
"name": "Swap Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
]
def _mount_to_id(mountpoint: str) -> str:
"""Convert a mountpoint path to a safe sensor key suffix."""
if mountpoint == "/":
return "root"
safe = mountpoint.strip("/").replace("/", "_").replace("-", "_").replace(" ", "_")
return safe or "root"
class DiskCollector:
"""Disk usage per partition + aggregate I/O speed."""
def __init__(self, cfg: dict):
self._cfg = cfg
self._prev_io: psutil._common.sdiskio | None = None
self._prev_time: float = time.monotonic()
self._mounts: list[str] = [] # populated on first collect
def _get_mounts(self) -> list[str]:
whitelist = [m.strip() for m in (self._cfg.get("partitions") or []) if m.strip()]
partitions = []
for part in psutil.disk_partitions(all=False):
# Skip loop devices, tmpfs, and other virtual filesystems
if part.fstype in ("squashfs", "tmpfs", "devtmpfs", "overlay", ""):
continue
if "loop" in part.device:
continue
if whitelist and part.mountpoint not in whitelist:
continue
partitions.append(part.mountpoint)
return partitions
def collect(self) -> dict[str, Any]:
data: dict[str, Any] = {}
if not self._mounts:
self._mounts = self._get_mounts()
for mount in self._mounts:
mid = _mount_to_id(mount)
try:
usage = psutil.disk_usage(mount)
data[f"disk_{mid}_percent"] = round(usage.percent, 1)
data[f"disk_{mid}_used"] = round(usage.used / 2**30, 2)
data[f"disk_{mid}_free"] = round(usage.free / 2**30, 2)
data[f"disk_{mid}_total"] = round(usage.total / 2**30, 2)
except PermissionError:
pass
if self._cfg.get("io_stats", True):
now = time.monotonic()
elapsed = now - self._prev_time
try:
cur_io = psutil.disk_io_counters()
if self._prev_io is not None and elapsed > 0:
rb = (cur_io.read_bytes - self._prev_io.read_bytes) / elapsed
wb = (cur_io.write_bytes - self._prev_io.write_bytes) / elapsed
data["disk_read_speed"] = round(rb / 2**20, 2)
data["disk_write_speed"] = round(wb / 2**20, 2)
else:
data["disk_read_speed"] = 0.0
data["disk_write_speed"] = 0.0
self._prev_io = cur_io
except Exception:
data["disk_read_speed"] = None
data["disk_write_speed"] = None
self._prev_time = now
return data
def sensor_definitions(self) -> list[dict]:
"""Build sensor defs based on discovered mounts."""
if not self._mounts:
self._mounts = self._get_mounts()
defs = []
for mount in self._mounts:
mid = _mount_to_id(mount)
label = mount # human-readable label
defs += [
{
"key": f"disk_{mid}_percent",
"name": f"Disk {label} Usage",
"unit": "%",
"device_class": None,
"state_class": "measurement",
"icon": "mdi:harddisk",
},
{
"key": f"disk_{mid}_used",
"name": f"Disk {label} Used",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": f"disk_{mid}_free",
"name": f"Disk {label} Free",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
{
"key": f"disk_{mid}_total",
"name": f"Disk {label} Total",
"unit": "GiB",
"device_class": "data_size",
"state_class": "measurement",
"icon": None,
},
]
if self._cfg.get("io_stats", True):
defs += [
{
"key": "disk_read_speed",
"name": "Disk Read Speed",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": None,
},
{
"key": "disk_write_speed",
"name": "Disk Write Speed",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": None,
},
]
return defs
class NetworkCollector:
"""Per-interface download / upload speed."""
def __init__(self, cfg: dict):
self._cfg = cfg
self._prev: dict[str, Any] = {}
self._prev_time: float = time.monotonic()
self._ifaces: list[str] = []
def _get_ifaces(self) -> list[str]:
whitelist = [i.strip() for i in (self._cfg.get("interfaces") or []) if i.strip()]
counters = psutil.net_io_counters(pernic=True)
ifaces = []
for iface in counters:
if iface == "lo":
continue
if whitelist and iface not in whitelist:
continue
ifaces.append(iface)
return ifaces
def collect(self) -> dict[str, Any]:
if not self._ifaces:
self._ifaces = self._get_ifaces()
data: dict[str, Any] = {}
now = time.monotonic()
elapsed = now - self._prev_time
counters = psutil.net_io_counters(pernic=True)
for iface in self._ifaces:
if iface not in counters:
continue
cur = counters[iface]
safe = iface.replace("-", "_").replace(".", "_")
if iface in self._prev and elapsed > 0:
rx = (cur.bytes_recv - self._prev[iface].bytes_recv) / elapsed
tx = (cur.bytes_sent - self._prev[iface].bytes_sent) / elapsed
data[f"net_{safe}_in"] = round(max(rx, 0) / 2**20, 3)
data[f"net_{safe}_out"] = round(max(tx, 0) / 2**20, 3)
else:
data[f"net_{safe}_in"] = 0.0
data[f"net_{safe}_out"] = 0.0
self._prev[iface] = cur
self._prev_time = now
return data
def sensor_definitions(self) -> list[dict]:
if not self._ifaces:
self._ifaces = self._get_ifaces()
defs = []
for iface in self._ifaces:
safe = iface.replace("-", "_").replace(".", "_")
defs += [
{
"key": f"net_{safe}_in",
"name": f"{iface} Download",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": "mdi:download-network",
},
{
"key": f"net_{safe}_out",
"name": f"{iface} Upload",
"unit": "MiB/s",
"device_class": "data_rate",
"state_class": "measurement",
"icon": "mdi:upload-network",
},
]
return defs
class SystemCollector:
"""Uptime, logged-in users."""
def collect(self) -> dict[str, Any]:
uptime_s = int(time.time() - psutil.boot_time())
users = psutil.users()
usernames = sorted({u.name for u in users}) if users else []
return {
"uptime": uptime_s,
"logged_in_users": ", ".join(usernames) if usernames else "none",
}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "uptime",
"name": "Uptime",
"unit": "s",
"device_class": "duration",
"state_class": "measurement",
"icon": None,
},
{
"key": "logged_in_users",
"name": "Logged In Users",
"unit": None,
"device_class": None,
"state_class": None,
"icon": "mdi:account-multiple",
},
]
class IdleTimeCollector:
"""Session idle time via xprintidle. Requires X11 and xprintidle in PATH."""
_available: bool | None = None # cached after first probe
def _check_available(self) -> bool:
if self._available is None:
try:
subprocess.run(
["xprintidle"],
capture_output=True,
timeout=3,
)
self._available = True
except FileNotFoundError:
self._available = False
log.warning(
"xprintidle not found; idle_time sensor disabled. "
"Install it (e.g. 'pacman -S xprintidle' or via AUR)."
)
except Exception as exc:
self._available = False
log.warning("xprintidle probe failed: %s", exc)
return self._available
def collect(self) -> dict[str, Any]:
if not os.environ.get("DISPLAY"):
log.debug("DISPLAY not set; skipping idle time collection.")
return {"session_idle": None}
if not self._check_available():
return {"session_idle": None}
try:
result = subprocess.run(
["xprintidle"],
capture_output=True,
text=True,
timeout=3,
)
if result.returncode == 0:
ms = int(result.stdout.strip())
return {"session_idle": round(ms / 1000, 1)}
except Exception as exc:
log.debug("xprintidle runtime error: %s", exc)
return {"session_idle": None}
@staticmethod
def sensor_definitions(hostname: str) -> list[dict]:
return [
{
"key": "session_idle",
"name": "Session Idle Time",
"unit": "s",
"device_class": "duration",
"state_class": "measurement",
"icon": "mdi:timer-outline",
}
]
# ---------------------------------------------------------------------------
# Discovery publisher
# ---------------------------------------------------------------------------
class DiscoveryPublisher:
"""Builds and publishes HA MQTT auto-discovery config payloads."""
def __init__(
self,
client: mqtt.Client,
hostname: str,
device_name: str,
state_topic: str,
availability_topic: str,
discovery_prefix: str = "homeassistant",
):
self._client = client
self._hostname = hostname
self._device_name = device_name
self._state_topic = state_topic
self._availability_topic = availability_topic
self._discovery_prefix = discovery_prefix
def _device_block(self) -> dict:
return {
"identifiers": [f"{self._hostname}_monitor"],
"name": self._device_name,
"manufacturer": "monitor.py",
"model": "Linux System Monitor",
"sw_version": SCRIPT_VERSION,
}
def publish_all(self, sensor_defs: list[dict]) -> None:
for defn in sensor_defs:
self._publish_one(defn)
log.info("Published %d discovery configs.", len(sensor_defs))
def _publish_one(self, defn: dict) -> None:
key = defn["key"]
unique_id = f"{self._hostname}_{key}"
object_id = unique_id
payload: dict[str, Any] = {
"name": defn["name"],
"unique_id": unique_id,
"state_topic": self._state_topic,
"value_template": f"{{{{ value_json.{key} }}}}",
"availability_topic": self._availability_topic,
"device": self._device_block(),
}
if defn.get("unit"):
payload["unit_of_measurement"] = defn["unit"]
if defn.get("device_class"):
payload["device_class"] = defn["device_class"]
if defn.get("state_class"):
payload["state_class"] = defn["state_class"]
if defn.get("icon"):
payload["icon"] = defn["icon"]
discovery_topic = (
f"{self._discovery_prefix}/sensor/{object_id}/config"
)
self._client.publish(
discovery_topic,
json.dumps(payload),
qos=1,
retain=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def collect_all(collectors: list) -> dict[str, Any]:
data: dict[str, Any] = {}
for col in collectors:
try:
data.update(col.collect())
except Exception as exc:
log.warning("Collector %s failed: %s", type(col).__name__, exc)
return data
def all_sensor_defs(
collectors: list,
hostname: str,
) -> list[dict]:
defs = []
for col in collectors:
if hasattr(col, "sensor_definitions"):
sd = col.sensor_definitions
# Static methods accept hostname; instance methods don't need it
try:
result = sd(hostname)
except TypeError:
result = sd()
defs.extend(result)
return defs
# ---------------------------------------------------------------------------
# MQTT setup
# ---------------------------------------------------------------------------
def make_topics(prefix: str, hostname: str) -> tuple[str, str]:
"""Return (state_topic, availability_topic)."""
base = f"{prefix}/{hostname}"
return f"{base}/state", f"{base}/availability"
def build_client(cfg: dict, hostname: str) -> mqtt.Client:
mqtt_cfg = cfg["mqtt"]
client_id = mqtt_cfg["client_id"] or f"pc_monitor_{hostname}"
client = mqtt.Client(client_id=client_id)
if mqtt_cfg["username"]:
client.username_pw_set(mqtt_cfg["username"], mqtt_cfg["password"] or None)
_, availability_topic = make_topics(mqtt_cfg["topic_prefix"], hostname)
client.will_set(availability_topic, payload="offline", qos=1, retain=True)
return client
# ---------------------------------------------------------------------------
# Main application
# ---------------------------------------------------------------------------
class MonitorApp:
def __init__(self, cfg: dict):
self._cfg = cfg
self._hostname = socket.gethostname()
self._device_name = cfg["monitor"].get("device_name") or self._hostname
self._interval = max(5, int(cfg["monitor"].get("interval", 30)))
prefix = cfg["mqtt"]["topic_prefix"]
self._state_topic, self._avail_topic = make_topics(prefix, self._hostname)
self._collectors = self._build_collectors()
self._sensor_defs = all_sensor_defs(self._collectors, self._hostname)
self._client = build_client(cfg, self._hostname)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
self._discovery = DiscoveryPublisher(
self._client,
self._hostname,
self._device_name,
self._state_topic,
self._avail_topic,
)
self._running = True
# Warm up cpu_percent (first call always returns 0.0)
psutil.cpu_percent(interval=None)
def _build_collectors(self) -> list:
s = self._cfg["sensors"]
collectors = []
if s.get("cpu", True):
collectors.append(CpuCollector())
if s.get("gpu", True):
collectors.append(GpuCollector())
if s.get("ram", True):
collectors.append(RamCollector())
if s.get("disk", {}).get("enabled", True) if isinstance(s.get("disk"), dict) else s.get("disk", True):
collectors.append(DiskCollector(s["disk"] if isinstance(s.get("disk"), dict) else {}))
if s.get("network", {}).get("enabled", True) if isinstance(s.get("network"), dict) else s.get("network", True):
collectors.append(NetworkCollector(s["network"] if isinstance(s.get("network"), dict) else {}))
if s.get("system", True):
collectors.append(SystemCollector())
if s.get("idle_time", True):
collectors.append(IdleTimeCollector())
return collectors
# ------------------------------------------------------------------
# MQTT callbacks
# ------------------------------------------------------------------
def _on_connect(self, client, userdata, flags, rc):
if rc != 0:
log.error("MQTT connection failed with code %d", rc)
return
log.info("Connected to MQTT broker.")
client.publish(self._avail_topic, "online", qos=1, retain=True)
self._discovery.publish_all(self._sensor_defs)
self._publish_state()
# Subscribe to HA birth messages so we can resend discovery after HA restart
client.subscribe("homeassistant/status", qos=1)
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
log.warning("Unexpected MQTT disconnect (rc=%d). Will auto-reconnect.", rc)
def _on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode()
except Exception:
return
if msg.topic == "homeassistant/status" and payload == "online":
log.info("Home Assistant came online; resending discovery configs.")
self._discovery.publish_all(self._sensor_defs)
self._publish_state()
# ------------------------------------------------------------------
# State publishing
# ------------------------------------------------------------------
def _publish_state(self):
data = collect_all(self._collectors)
self._client.publish(
self._state_topic,
json.dumps(data),
qos=1,
retain=False,
)
log.debug("State published: %d keys", len(data))
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self):
mqtt_cfg = self._cfg["mqtt"]
log.info(
"Connecting to %s:%d as device '%s' (interval=%ds)...",
mqtt_cfg["host"],
mqtt_cfg["port"],
self._device_name,
self._interval,
)
self._client.connect(
mqtt_cfg["host"],
port=mqtt_cfg["port"],
keepalive=60,
)
self._client.loop_start()
def stop(self):
log.info("Shutting down — publishing offline status.")
self._running = False
try:
self._client.publish(self._avail_topic, "offline", qos=1, retain=True)
time.sleep(0.5) # give broker time to process
finally:
self._client.loop_stop()
self._client.disconnect()
def run(self):
self.start()
try:
while self._running:
loop_start = time.monotonic()
self._publish_state()
elapsed = time.monotonic() - loop_start
sleep_time = max(0.0, self._interval - elapsed)
time.sleep(sleep_time)
finally:
self.stop()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
# Allow an optional path argument for the config file
config_path = Path(sys.argv[1]) if len(sys.argv) > 1 else CONFIG_PATH
if not config_path.exists():
log.error("Config file not found: %s", config_path)
sys.exit(1)
cfg = load_config(config_path)
app = MonitorApp(cfg)
def _shutdown(signum, frame):
log.info("Signal %d received.", signum)
app.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
app.run()
if __name__ == "__main__":
main()
+24
View File
@@ -0,0 +1,24 @@
[Unit]
Description=PC Monitor — MQTT system stats for Home Assistant
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=USERNAME
WorkingDirectory=/home/USERNAME/monitor
ExecStart=/usr/bin/python3 /home/USERNAME/monitor/monitor.py
# Required for xprintidle to query the X server for session idle time.
# Adjust :0 if your display is on a different number (check 'echo $DISPLAY').
Environment=DISPLAY=:0
Environment=XAUTHORITY=/home/USERNAME/.Xauthority
Restart=on-failure
RestartSec=10
# Give the process up to 10 s to publish its offline LWT before force-kill
TimeoutStopSec=10
# Logging goes to the journal; view with: journalctl -u pc-monitor -f
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target