Source code for pumaguard.sound

"""
Sounds
"""

import logging
import re
import subprocess
import sys
import threading
from typing import (
    Optional,
    Union,
)

# Controls to try in order of preference when auto-detecting the ALSA playback
# control.  The first entry that exists on the device and supports playback
# volume will be used.
_VOLUME_CONTROL_PRIORITY = [
    "PCM",
    "Master",
    "Speaker",
    "Headphone",
    "Digital",
    "Lineout",
]

# Maximum software gain value accepted by mpg123's "-f" flag.
_MPG123_MAX_GAIN = 32768

# Cached result of detect_volume_control() so we only probe amixer once per
# process lifetime.
_DETECTED_VOLUME_CONTROL: Union[str, None] = None

logger = logging.getLogger(__name__)


[docs] def detect_volume_control() -> Optional[str]: """Auto-detect the ALSA simple-mixer control to use for volume. Runs ``amixer scontents`` once and finds all controls that advertise ``pvolume`` (playback volume) capability. From those, the first name that appears in :data:`_VOLUME_CONTROL_PRIORITY` is returned. If none of the preferred names match, the first playback-capable control found is returned instead. The result is cached so that ``amixer`` is only invoked once for the lifetime of the process. Returns: The name of the best available playback volume control (e.g. ``"PCM"``, ``"Master"``), or ``None`` if amixer is unavailable or no suitable control could be found. """ global _DETECTED_VOLUME_CONTROL # pylint: disable=global-statement if _DETECTED_VOLUME_CONTROL is not None: return _DETECTED_VOLUME_CONTROL logger.debug("Auto-detecting ALSA playback volume control") try: result = subprocess.run( ["amixer", "scontents"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode != 0: logger.warning( "amixer scontents failed (rc=%d): %s", result.returncode, result.stderr.decode().strip(), ) return None # Parse output: collect control names that have 'pvolume' capability. # amixer scontents output looks like: # Simple mixer control 'Master',0 # Capabilities: pvolume pswitch pswitch-joined # ... # Simple mixer control 'Capture',0 # Capabilities: cvolume cswitch cswitch-joined # ... playback_controls: list[str] = [] current_control: Optional[str] = None for line in result.stdout.decode().splitlines(): ctrl_match = re.match(r"Simple mixer control '([^']+)'", line) if ctrl_match: current_control = ctrl_match.group(1) elif current_control is not None and "Capabilities:" in line: if "pvolume" in line: playback_controls.append(current_control) # Either way, reset so we don't re-check on subsequent lines. current_control = None if not playback_controls: logger.warning( "No ALSA controls with playback volume capability found" ) return None # Pick the highest-priority preferred control that is available. for preferred in _VOLUME_CONTROL_PRIORITY: if preferred in playback_controls: _DETECTED_VOLUME_CONTROL = preferred logger.debug( "Selected ALSA volume control: %s (preferred)", _DETECTED_VOLUME_CONTROL, ) return _DETECTED_VOLUME_CONTROL # None of the preferred names matched — use the first available one. _DETECTED_VOLUME_CONTROL = playback_controls[0] logger.debug( "Selected ALSA volume control: %s (first available)", _DETECTED_VOLUME_CONTROL, ) return _DETECTED_VOLUME_CONTROL except (subprocess.SubprocessError, FileNotFoundError) as e: logger.warning("Could not detect ALSA volume control: %s", e) return None
[docs] def reset_volume_control_cache(): """Reset the cached ALSA volume control detection result. This forces :func:`detect_volume_control` to re-probe ``amixer`` on its next call. Intended for use in tests and for situations where the audio device has changed at runtime. """ global _DETECTED_VOLUME_CONTROL # pylint: disable=global-statement _DETECTED_VOLUME_CONTROL = None
# Global variable to track the current playing process _CURRENT_PROCESS: Optional[subprocess.Popen] = None _process_lock = threading.Lock()
[docs] def get_volume(control: Optional[str] = None) -> Optional[int]: """ Get the current ALSA mixer volume using amixer. Args: control: ALSA mixer control name. When ``None`` (the default) the control is auto-detected via :func:`detect_volume_control`. Returns: Current volume level as an integer from 0-100, or None if it could not be determined. """ if control is None: control = detect_volume_control() if control is None: logger.warning("Could not determine ALSA volume control") return None logger.debug("Reading ALSA volume: control=%s", control) try: cmd = ["amixer", "get", control] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode != 0: logger.warning( "amixer get %s failed (rc=%d): %s", control, result.returncode, result.stderr.decode().strip(), ) return None output = result.stdout.decode() # amixer output contains lines like: # Mono: Playback 192 [75%] [on] # Front Left: Playback 49152 [75%] [on] # Extract the first percentage value found. match = re.search(r"\[(\d+)%\]", output) if match: volume = int(match.group(1)) logger.debug( "ALSA volume read: %d%% on control %s", volume, control ) return volume logger.warning( "Could not parse volume from amixer output: %s", output.strip() ) return None except (subprocess.SubprocessError, FileNotFoundError) as e: logger.warning("Could not read ALSA volume via amixer: %s", e) return None
[docs] def set_volume(volume: int, control: Optional[str] = None) -> bool: """Set ALSA volume and return success status. Args: volume: Volume level from 0-100 control: ALSA mixer control name. When ``None`` (the default) the control is auto-detected via :func:`detect_volume_control`. Returns: True when volume was successfully applied, False otherwise. """ if control is None: control = detect_volume_control() if control is None: logger.warning( "Could not determine ALSA volume control; skipping set_volume" ) return False logger.info( "Setting ALSA volume: control=%s, volume=%d%%", control, volume ) try: cmd = ["amixer", "set", control, f"{volume}%"] result = subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=False, ) if result.returncode != 0: logger.warning( "amixer set %s %d%% failed (rc=%d): %s", control, volume, result.returncode, result.stderr.decode().strip(), ) return False logger.debug("ALSA volume set to %d%% on control %s", volume, control) return True except (subprocess.SubprocessError, FileNotFoundError) as e: logger.warning("Could not set ALSA volume via amixer: %s", e) return False
[docs] def playsound(soundfile: str, volume: int = 80, blocking: bool = True): """ Play a sound file with specified volume. The volume is applied via the ALSA mixer (amixer) before playback, so the system output level is adjusted rather than using mpg123's software scaling. Args: soundfile: Path to the sound file to play volume: Volume level from 0-100 (default: 80) blocking: If True, wait for sound to finish. If False, return immediately (default: True) """ global _CURRENT_PROCESS # pylint: disable=global-statement logger.info( "playsound called: file=%s, volume=%d, blocking=%s", soundfile, volume, blocking, ) # Set the ALSA mixer volume before playback. If ALSA controls are not # available, fall back to mpg123 software gain. volume_applied = set_volume(volume) try: with _process_lock: # Stop any currently playing sound if _CURRENT_PROCESS is not None: try: _CURRENT_PROCESS.terminate() _CURRENT_PROCESS.wait(timeout=1) except (subprocess.TimeoutExpired, ProcessLookupError): pass _CURRENT_PROCESS = None cmd = ["mpg123", "-o", "alsa,pulse"] if not volume_applied: clamped_volume = max(0, min(volume, 100)) software_gain = round(clamped_volume * _MPG123_MAX_GAIN / 100) logger.info( "Using mpg123 software volume fallback: " "volume=%d%% scale=%d", clamped_volume, software_gain, ) cmd.extend(["-f", str(software_gain)]) cmd.append(soundfile) logger.info("Executing command: %s", " ".join(cmd)) # pylint: disable=consider-using-with _CURRENT_PROCESS = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) logger.info( "Sound playback started, PID: %d", _CURRENT_PROCESS.pid ) if blocking: # Wait for completion _CURRENT_PROCESS.wait() _CURRENT_PROCESS = None except subprocess.SubprocessError as e: logger.error("Error playing soundfile %s: %s", soundfile, e) print(f"Error playing soundfile {soundfile}: {e}") with _process_lock: _CURRENT_PROCESS = None
[docs] def stop_sound(): """ Stop any currently playing sound. Returns: bool: True if a sound was stopped, False if nothing was playing """ global _CURRENT_PROCESS # pylint: disable=global-statement with _process_lock: if _CURRENT_PROCESS is not None: try: logger.info( "Stopping sound playback, PID: %d", _CURRENT_PROCESS.pid ) _CURRENT_PROCESS.terminate() _CURRENT_PROCESS.wait(timeout=1) logger.info("Sound playback stopped successfully") return True except (subprocess.TimeoutExpired, ProcessLookupError): try: _CURRENT_PROCESS.kill() _CURRENT_PROCESS.wait(timeout=1) except (subprocess.TimeoutExpired, ProcessLookupError): pass return True finally: _CURRENT_PROCESS = None return False
[docs] def is_playing(): """ Check if a sound is currently playing. Returns: bool: True if a sound is currently playing, False otherwise """ global _CURRENT_PROCESS # pylint: disable=global-statement with _process_lock: if _CURRENT_PROCESS is not None: # Check if process is still running if _CURRENT_PROCESS.poll() is None: return True # Process finished, clean up _CURRENT_PROCESS = None return False
[docs] def main(): """ Main entry point. """ if len(sys.argv) < 2: print("Usage: pumaguard-sound <soundfile> [volume]") sys.exit(1) volume = 80 if len(sys.argv) >= 3: try: volume = int(sys.argv[2]) if volume < 0 or volume > 100: print("Volume must be between 0 and 100") sys.exit(1) except ValueError: print("Volume must be an integer") sys.exit(1) playsound(sys.argv[1], volume)