""" Recording session management and file saving. """ import os import logging from datetime import datetime from typing import Optional logger = logging.getLogger(__name__) # Default recordings directory RECORDINGS_DIR = os.environ.get('RECORDINGS_PATH', '/app/recordings') def ensure_recordings_dir(): """Ensure the recordings directory exists.""" os.makedirs(RECORDINGS_DIR, exist_ok=True) return RECORDINGS_DIR def generate_filename(start_time: datetime) -> str: """ Generate a filename from the session start time. Format: YYYY-MM-DD_HH-MM-SS_captions.md """ return start_time.strftime('%Y-%m-%d_%H-%M-%S_captions.md') def calculate_duration(start_time: datetime, end_time: datetime) -> str: """Calculate and format duration as HH:MM:SS.""" delta = end_time - start_time total_seconds = int(delta.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def get_whisper_model_name() -> str: """Get the configured Whisper model name.""" return os.environ.get('WHISPER_MODEL', 'base') def save_recording( start_time: datetime, end_time: datetime, transcript: str, word_count: int, client_id: str ) -> Optional[str]: """ Save a recording session to a markdown file. Args: start_time: Session start datetime end_time: Session end datetime transcript: Full transcript text word_count: Number of words in transcript client_id: WebSocket client session ID Returns: Filename if successful, None if failed """ try: ensure_recordings_dir() filename = generate_filename(start_time) filepath = os.path.join(RECORDINGS_DIR, filename) duration = calculate_duration(start_time, end_time) model_name = get_whisper_model_name() # Build markdown content with frontmatter content = f"""--- session_start: {start_time.isoformat()} session_end: {end_time.isoformat()} duration: {duration} whisper_model: {model_name} word_count: {word_count} --- # Live Captions Recording **Session Start:** {start_time.strftime('%Y-%m-%d %H:%M:%S')} **Session End:** {end_time.strftime('%Y-%m-%d %H:%M:%S')} **Duration:** {duration} **Model:** {model_name} **Words:** {word_count} --- ## Transcript {transcript} """ with open(filepath, 'w', encoding='utf-8') as f: f.write(content) logger.info(f"Recording saved: {filename} ({word_count} words)") return filename except Exception as e: logger.error(f"Failed to save recording: {e}") return None def list_recordings() -> list: """ List all recording files, sorted by date descending. Returns: List of recording metadata dicts """ ensure_recordings_dir() recordings = [] try: for filename in os.listdir(RECORDINGS_DIR): if filename.endswith('_captions.md'): filepath = os.path.join(RECORDINGS_DIR, filename) stat = os.stat(filepath) # Parse date from filename (YYYY-MM-DD_HH-MM-SS_captions.md) try: date_str = filename.replace('_captions.md', '') date_parts = date_str.split('_') display_date = f"{date_parts[0]} {date_parts[1].replace('-', ':')}" except (IndexError, ValueError): display_date = filename recordings.append({ 'filename': filename, 'date': display_date, 'size': stat.st_size, 'created': datetime.fromtimestamp(stat.st_mtime).isoformat() }) # Sort by filename descending (newest first) recordings.sort(key=lambda x: x['filename'], reverse=True) except Exception as e: logger.error(f"Failed to list recordings: {e}") return recordings def get_recording(filename: str) -> Optional[dict]: """ Get a specific recording's content. Args: filename: The recording filename Returns: Dict with filename and content, or None if not found """ ensure_recordings_dir() # Sanitize filename to prevent path traversal safe_filename = os.path.basename(filename) if not safe_filename.endswith('_captions.md'): return None filepath = os.path.join(RECORDINGS_DIR, safe_filename) try: if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as f: content = f.read() return { 'filename': safe_filename, 'content': content } except Exception as e: logger.error(f"Failed to read recording {safe_filename}: {e}") return None def delete_recording(filename: str) -> bool: """ Delete a specific recording. Args: filename: The recording filename Returns: True if deleted, False otherwise """ ensure_recordings_dir() # Sanitize filename to prevent path traversal safe_filename = os.path.basename(filename) if not safe_filename.endswith('_captions.md'): return False filepath = os.path.join(RECORDINGS_DIR, safe_filename) try: if os.path.exists(filepath): os.remove(filepath) logger.info(f"Recording deleted: {safe_filename}") return True except Exception as e: logger.error(f"Failed to delete recording {safe_filename}: {e}") return False