Source code for pumaguard.device_heartbeat

"""Base class for device heartbeat monitoring in PumaGuard.

This module provides a common abstract base class for monitoring device
availability in the background. It handles thread management, status updates,
persistence, and stale device removal.
"""

from __future__ import (
    annotations,
)

import logging
import threading
from abc import (
    ABC,
    abstractmethod,
)
from collections.abc import (
    Callable,
)
from datetime import (
    datetime,
    timedelta,
    timezone,
)
from typing import (
    TYPE_CHECKING,
)

if TYPE_CHECKING:
    from pumaguard.web_ui import (
        WebUI,
    )

logger = logging.getLogger(__name__)


[docs] class DeviceHeartbeat(ABC): """ Abstract base class for device heartbeat monitoring. Provides common functionality for monitoring device availability via background threads. Subclasses must implement device-specific check methods and persistence logic. """ # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__( self, webui: "WebUI", device_type: str, interval: int = 60, enabled: bool = True, status_change_callback: Callable[[str, dict], None] | None = None, auto_remove_enabled: bool = False, auto_remove_hours: int = 24, ): """ Initialize the device heartbeat monitor. Args: webui: WebUI instance containing device information device_type: Type of device being monitored (e.g., "plug", "camera") interval: Check interval in seconds (default: 60) enabled: Enable heartbeat monitoring (default: True) status_change_callback: Optional callback function to be called when device status changes. Signature: callback(event_type: str, device_data: dict) auto_remove_enabled: Enable automatic removal of stale devices (default: False) auto_remove_hours: Hours of inactivity before auto-removal (default: 24) """ self.webui = webui self.device_type = device_type self.interval = interval self.enabled = enabled self.status_change_callback = status_change_callback self.auto_remove_enabled = auto_remove_enabled self.auto_remove_hours = auto_remove_hours self._running = False self._thread: threading.Thread | None = None self._stop_event = threading.Event()
[docs] @abstractmethod def check_device(self, ip_address: str) -> bool: """ Check if a device is reachable. Subclasses must implement this method with their specific connectivity check logic (HTTP, ICMP, TCP, etc.). Args: ip_address: IP address of the device Returns: True if device is reachable, False otherwise """
@abstractmethod def _get_devices_dict(self) -> dict: """ Get the devices dictionary from webui. Returns: Dictionary mapping MAC addresses to device info (e.g., webui.plugs or webui.cameras) """ @abstractmethod def _save_device_list(self) -> None: """ Save the device list to settings file. Subclasses must implement this to persist their specific device attributes to the settings file. """ @abstractmethod def _get_log_context(self) -> str: """ Get logging context string for this device type. Returns: String describing monitoring configuration for logging (e.g., "interval=60s, timeout=5s" or "method=tcp, port=80") """ def _update_device_status( self, mac_address: str, is_reachable: bool ) -> None: """ Update device status and last_seen timestamp. Args: mac_address: MAC address of the device is_reachable: Whether the device is currently reachable """ devices = self._get_devices_dict() if mac_address not in devices: return device = devices[mac_address] timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") status_changed = False if is_reachable: # Device is reachable - update status to connected if device["status"] != "connected": logger.info( "%s '%s' is now reachable at %s", self.device_type.capitalize(), device["hostname"], device["ip_address"], ) status_changed = True device["status"] = "connected" device["last_seen"] = timestamp else: # Device is not reachable - update status to disconnected if device["status"] == "connected": logger.warning( "%s '%s' is no longer reachable at %s", self.device_type.capitalize(), device["hostname"], device["ip_address"], ) status_changed = True device["status"] = "disconnected" # Don't update last_seen on failure - keep the last successful time # Persist changes to settings self._save_device_list() # Notify callback if status changed if status_changed and self.status_change_callback: try: event_type = ( f"{self.device_type}_status_changed_online" if is_reachable else f"{self.device_type}_status_changed_offline" ) self.status_change_callback(event_type, dict(device)) except Exception as e: # pylint: disable=broad-except logger.error( "Error calling status change callback: %s", str(e) ) def _check_and_remove_stale_devices(self) -> None: """ Check for devices not seen within configured timeout. Remove devices whose last_seen timestamp exceeds the configured hours threshold. Called during heartbeat monitoring loop if auto-removal is enabled. """ now = datetime.now(timezone.utc) removal_threshold = timedelta(hours=self.auto_remove_hours) devices_to_remove = [] devices = self._get_devices_dict() for mac_address, device in list(devices.items()): last_seen_str = device.get("last_seen") if not last_seen_str: continue try: # Parse ISO8601 timestamp last_seen = datetime.fromisoformat( last_seen_str.replace("Z", "+00:00") ) # Calculate time since last seen time_since_seen = now - last_seen hours_offline = time_since_seen.total_seconds() / 3600 # Check if device exceeds removal threshold if time_since_seen > removal_threshold: devices_to_remove.append((mac_address, device)) logger.info( "%s '%s' (%s) not seen for %.1f hours, " + "scheduling for auto-removal", self.device_type.capitalize(), device["hostname"], mac_address, hours_offline, ) # Log status for offline devices (debugging) elif device["status"] == "disconnected": if self.auto_remove_enabled: # Calculate time until removal time_until_removal = ( removal_threshold - time_since_seen ) hours_until_removal = ( time_until_removal.total_seconds() / 3600 ) logger.debug( "%s '%s' (%s) at %s has been offline " + "for %.1f hours, will be auto-removed " + "in %.1f hours", self.device_type.capitalize(), device["hostname"], mac_address, device["ip_address"], hours_offline, hours_until_removal, ) else: # Auto-removal disabled, log offline duration logger.debug( "%s '%s' (%s) at %s has been offline " + "for %.1f hours (auto-removal disabled)", self.device_type.capitalize(), device["hostname"], mac_address, device["ip_address"], hours_offline, ) except (ValueError, AttributeError) as e: logger.warning( "Could not parse last_seen timestamp for %s %s: %s", self.device_type, mac_address, str(e), ) # Remove devices outside iteration loop # (only if auto-removal is enabled) if self.auto_remove_enabled: for mac_address, device in devices_to_remove: try: # Remove from in-memory dictionary del devices[mac_address] # Persist changes self._save_device_list() logger.info( "Auto-removed %s '%s' (%s) at %s", self.device_type, device["hostname"], mac_address, device["ip_address"], ) # Notify via SSE if callback is available if self.status_change_callback: try: self.status_change_callback( f"{self.device_type}_removed", dict(device) ) except Exception as e: # pylint: disable=broad-except logger.error( "Error calling status change callback " + "for removal: %s", str(e), ) except Exception as e: # pylint: disable=broad-except logger.error( "Failed to auto-remove %s %s: %s", self.device_type, mac_address, str(e), ) elif devices_to_remove: # Auto-removal disabled but devices would have been removed logger.debug( "%d %s(s) would be auto-removed but feature is disabled", len(devices_to_remove), self.device_type, ) def _monitor_loop(self) -> None: """Main monitoring loop that runs in a background thread.""" auto_remove_msg = "" if self.auto_remove_enabled: auto_remove_msg = f", auto-remove after {self.auto_remove_hours}h" logger.info( "%s heartbeat monitor started (%s%s)", self.device_type.capitalize(), self._get_log_context(), auto_remove_msg, ) devices = self._get_devices_dict() while not self._stop_event.is_set(): try: # Check each device for mac_address, device in list(devices.items()): if self._stop_event.is_set(): break ip_address = device["ip_address"] if not ip_address: continue logger.debug( "Checking %s '%s' at %s", self.device_type, device["hostname"], ip_address, ) is_reachable = self.check_device(ip_address) self._update_device_status(mac_address, is_reachable) # Check for stale devices after status checks self._check_and_remove_stale_devices() except Exception as e: # pylint: disable=broad-except logger.error( "Error in %s heartbeat monitor loop: %s", self.device_type, str(e), ) # Wait for the next check interval or stop event self._stop_event.wait(self.interval) logger.info( "%s heartbeat monitor stopped", self.device_type.capitalize() )
[docs] def start(self) -> None: """Start the heartbeat monitoring thread.""" if not self.enabled: logger.info( "%s heartbeat monitoring is disabled", self.device_type.capitalize(), ) return if self._running: logger.warning( "%s heartbeat monitor is already running", self.device_type.capitalize(), ) return self._running = True self._stop_event.clear() thread_name = f"{self.device_type.capitalize()}Heartbeat" self._thread = threading.Thread( target=self._monitor_loop, daemon=True, name=thread_name ) self._thread.start() logger.info( "%s heartbeat monitoring started", self.device_type.capitalize() )
[docs] def stop(self) -> None: """Stop the heartbeat monitoring thread.""" if not self._running: logger.warning( "%s heartbeat monitor is not running", self.device_type.capitalize(), ) return self._running = False self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) if self._thread.is_alive(): logger.warning( "%s heartbeat monitor thread did not stop cleanly", self.device_type.capitalize(), ) else: logger.info( "%s heartbeat monitoring stopped", self.device_type.capitalize(), )
[docs] def check_now(self) -> dict[str, bool]: """ Immediately check all devices and return results. This can be called manually to force a check outside the regular interval. Returns: Dictionary mapping MAC addresses to reachability status """ results = {} devices = self._get_devices_dict() for mac_address, device in devices.items(): ip_address = device["ip_address"] if not ip_address: results[mac_address] = False continue is_reachable = self.check_device(ip_address) self._update_device_status(mac_address, is_reachable) results[mac_address] = is_reachable return results