"""Camera heartbeat monitoring for PumaGuard.
This module provides background monitoring of camera availability using
ICMP ping and TCP connection checks.
"""
from __future__ import (
annotations,
)
import logging
import socket
import subprocess
from collections.abc import (
Callable,
)
from typing import (
TYPE_CHECKING,
)
from pumaguard.device_heartbeat import (
DeviceHeartbeat,
)
if TYPE_CHECKING:
from pumaguard.web_ui import (
WebUI,
)
logger = logging.getLogger(__name__)
[docs]
class CameraHeartbeat(DeviceHeartbeat):
"""
Background service to monitor camera availability via ICMP ping
and TCP checks.
The heartbeat monitor runs in a separate thread and periodically
checks if cameras are reachable. It updates the camera status and
last_seen timestamp based on the results.
"""
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
webui: "WebUI",
interval: int = 60,
enabled: bool = True,
check_method: str = "tcp",
tcp_port: int = 80,
tcp_timeout: int = 3,
icmp_timeout: int = 2,
status_change_callback: Callable[[str, dict], None] | None = None,
auto_remove_enabled: bool = False,
auto_remove_hours: int = 24,
):
"""
Initialize the camera heartbeat monitor.
Args:
webui: WebUI instance containing camera information
interval: Check interval in seconds (default: 60)
enabled: Enable heartbeat monitoring (default: True)
check_method: Health check method - "icmp", "tcp", or
"both" (default: "tcp")
tcp_port: TCP port to check (default: 80 for HTTP)
tcp_timeout: TCP connection timeout in seconds (default: 3)
icmp_timeout: ICMP ping timeout in seconds (default: 2)
status_change_callback: Optional callback function to be called
when camera status changes. Signature:
callback(event_type: str, camera_data: dict)
auto_remove_enabled: Enable automatic removal of stale cameras
(default: False)
auto_remove_hours: Hours of inactivity before auto-removal
(default: 24)
"""
super().__init__(
webui=webui,
device_type="camera",
interval=interval,
enabled=enabled,
status_change_callback=status_change_callback,
auto_remove_enabled=auto_remove_enabled,
auto_remove_hours=auto_remove_hours,
)
self.check_method = check_method.lower()
self.tcp_port = tcp_port
self.tcp_timeout = tcp_timeout
self.icmp_timeout = icmp_timeout
# Validate check method
if self.check_method not in ["icmp", "tcp", "both"]:
logger.warning(
"Invalid check_method '%s', defaulting to 'tcp'",
self.check_method,
)
self.check_method = "tcp"
def _check_icmp(self, ip_address: str) -> bool:
"""
Check camera availability using ICMP ping.
Args:
ip_address: IP address to ping
Returns:
True if ping successful, False otherwise
"""
try:
# Use ping command with 1 packet and timeout
# -c 1: send 1 packet
# -W timeout: wait timeout seconds for response
# -q: quiet output (only summary)
result = subprocess.run(
["ping", "-c", "1", "-W", str(self.icmp_timeout), ip_address],
capture_output=True,
timeout=self.icmp_timeout + 1,
check=False,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
logger.debug("ICMP ping failed for %s: %s", ip_address, str(e))
return False
def _check_tcp(self, ip_address: str, port: int) -> bool:
"""
Check camera availability using TCP connection test.
Args:
ip_address: IP address to connect to
port: TCP port to connect to
Returns:
True if connection successful, False otherwise
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.tcp_timeout)
result = sock.connect_ex((ip_address, port))
sock.close()
return result == 0
except (socket.error, OSError) as e:
logger.debug(
"TCP connection failed for %s:%d: %s",
ip_address,
port,
str(e),
)
return False
[docs]
def check_camera(self, ip_address: str) -> bool:
"""
Check if a camera is reachable using the configured method.
Args:
ip_address: IP address of the camera
Returns:
True if camera is reachable, False otherwise
"""
if self.check_method == "icmp":
return self._check_icmp(ip_address)
if self.check_method == "tcp":
return self._check_tcp(ip_address, self.tcp_port)
if self.check_method == "both":
# Try ICMP first (faster), fall back to TCP
if self._check_icmp(ip_address):
return True
return self._check_tcp(ip_address, self.tcp_port)
return False
[docs]
def check_device(self, ip_address: str) -> bool:
"""
Check if a camera is reachable.
This is the abstract method implementation that delegates to
check_camera.
Args:
ip_address: IP address of the camera
Returns:
True if camera is reachable, False otherwise
"""
return self.check_camera(ip_address)
def _get_devices_dict(self) -> dict:
"""
Get the cameras dictionary from webui.
Returns:
Dictionary mapping MAC addresses to camera info
"""
return self.webui.cameras
def _save_device_list(self) -> None:
"""Save the camera list to settings file."""
try:
camera_list = []
for _, cam_info in self.webui.cameras.items():
camera_list.append(
{
"hostname": cam_info["hostname"],
"ip_address": cam_info["ip_address"],
"mac_address": cam_info["mac_address"],
"last_seen": cam_info["last_seen"],
"status": cam_info["status"],
}
)
self.webui.presets.cameras = camera_list
self.webui.presets.save()
except Exception as e: # pylint: disable=broad-except
logger.error("Failed to save camera list: %s", str(e))
def _get_log_context(self) -> str:
"""
Get logging context string for camera monitoring.
Returns:
String describing monitoring configuration
"""
return (
f"method={self.check_method}, interval={self.interval}s, "
f"port={self.tcp_port}"
)
# Alias for backwards compatibility with tests
_save_camera_list = _save_device_list
# Backwards compatibility methods for tests
def _update_camera_status(
self, mac_address: str, is_reachable: bool
) -> None:
"""
Update camera status and last_seen timestamp.
This method is provided for backwards compatibility.
New code should use _update_device_status() instead.
Args:
mac_address: MAC address of the camera
is_reachable: Whether the camera is currently reachable
"""
return self._update_device_status(mac_address, is_reachable)
def _check_and_remove_stale_cameras(self) -> None:
"""
Check for cameras not seen within configured timeout.
This method is provided for backwards compatibility.
New code should use _check_and_remove_stale_devices() instead.
"""
return self._check_and_remove_stale_devices()