Real-time speech-to-text using OpenAI Whisper (faster-whisper). Features browser audio capture, WebSocket streaming, and customizable display settings. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
209 lines
5.5 KiB
Python
209 lines
5.5 KiB
Python
"""
|
|
Recording session management and file saving.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default recordings directory
|
|
RECORDINGS_DIR = os.environ.get('RECORDINGS_PATH', '/app/recordings')
|
|
|
|
|
|
def ensure_recordings_dir():
|
|
"""Ensure the recordings directory exists."""
|
|
os.makedirs(RECORDINGS_DIR, exist_ok=True)
|
|
return RECORDINGS_DIR
|
|
|
|
|
|
def generate_filename(start_time: datetime) -> str:
|
|
"""
|
|
Generate a filename from the session start time.
|
|
Format: YYYY-MM-DD_HH-MM-SS_captions.md
|
|
"""
|
|
return start_time.strftime('%Y-%m-%d_%H-%M-%S_captions.md')
|
|
|
|
|
|
def calculate_duration(start_time: datetime, end_time: datetime) -> str:
|
|
"""Calculate and format duration as HH:MM:SS."""
|
|
delta = end_time - start_time
|
|
total_seconds = int(delta.total_seconds())
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
|
|
def get_whisper_model_name() -> str:
|
|
"""Get the configured Whisper model name."""
|
|
return os.environ.get('WHISPER_MODEL', 'base')
|
|
|
|
|
|
def save_recording(
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
transcript: str,
|
|
word_count: int,
|
|
client_id: str
|
|
) -> Optional[str]:
|
|
"""
|
|
Save a recording session to a markdown file.
|
|
|
|
Args:
|
|
start_time: Session start datetime
|
|
end_time: Session end datetime
|
|
transcript: Full transcript text
|
|
word_count: Number of words in transcript
|
|
client_id: WebSocket client session ID
|
|
|
|
Returns:
|
|
Filename if successful, None if failed
|
|
"""
|
|
try:
|
|
ensure_recordings_dir()
|
|
|
|
filename = generate_filename(start_time)
|
|
filepath = os.path.join(RECORDINGS_DIR, filename)
|
|
|
|
duration = calculate_duration(start_time, end_time)
|
|
model_name = get_whisper_model_name()
|
|
|
|
# Build markdown content with frontmatter
|
|
content = f"""---
|
|
session_start: {start_time.isoformat()}
|
|
session_end: {end_time.isoformat()}
|
|
duration: {duration}
|
|
whisper_model: {model_name}
|
|
word_count: {word_count}
|
|
---
|
|
|
|
# Live Captions Recording
|
|
|
|
**Session Start:** {start_time.strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Session End:** {end_time.strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Duration:** {duration}
|
|
**Model:** {model_name}
|
|
**Words:** {word_count}
|
|
|
|
---
|
|
|
|
## Transcript
|
|
|
|
{transcript}
|
|
"""
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
logger.info(f"Recording saved: {filename} ({word_count} words)")
|
|
return filename
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save recording: {e}")
|
|
return None
|
|
|
|
|
|
def list_recordings() -> list:
|
|
"""
|
|
List all recording files, sorted by date descending.
|
|
|
|
Returns:
|
|
List of recording metadata dicts
|
|
"""
|
|
ensure_recordings_dir()
|
|
recordings = []
|
|
|
|
try:
|
|
for filename in os.listdir(RECORDINGS_DIR):
|
|
if filename.endswith('_captions.md'):
|
|
filepath = os.path.join(RECORDINGS_DIR, filename)
|
|
stat = os.stat(filepath)
|
|
|
|
# Parse date from filename (YYYY-MM-DD_HH-MM-SS_captions.md)
|
|
try:
|
|
date_str = filename.replace('_captions.md', '')
|
|
date_parts = date_str.split('_')
|
|
display_date = f"{date_parts[0]} {date_parts[1].replace('-', ':')}"
|
|
except (IndexError, ValueError):
|
|
display_date = filename
|
|
|
|
recordings.append({
|
|
'filename': filename,
|
|
'date': display_date,
|
|
'size': stat.st_size,
|
|
'created': datetime.fromtimestamp(stat.st_mtime).isoformat()
|
|
})
|
|
|
|
# Sort by filename descending (newest first)
|
|
recordings.sort(key=lambda x: x['filename'], reverse=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list recordings: {e}")
|
|
|
|
return recordings
|
|
|
|
|
|
def get_recording(filename: str) -> Optional[dict]:
|
|
"""
|
|
Get a specific recording's content.
|
|
|
|
Args:
|
|
filename: The recording filename
|
|
|
|
Returns:
|
|
Dict with filename and content, or None if not found
|
|
"""
|
|
ensure_recordings_dir()
|
|
|
|
# Sanitize filename to prevent path traversal
|
|
safe_filename = os.path.basename(filename)
|
|
if not safe_filename.endswith('_captions.md'):
|
|
return None
|
|
|
|
filepath = os.path.join(RECORDINGS_DIR, safe_filename)
|
|
|
|
try:
|
|
if os.path.exists(filepath):
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
return {
|
|
'filename': safe_filename,
|
|
'content': content
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to read recording {safe_filename}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def delete_recording(filename: str) -> bool:
|
|
"""
|
|
Delete a specific recording.
|
|
|
|
Args:
|
|
filename: The recording filename
|
|
|
|
Returns:
|
|
True if deleted, False otherwise
|
|
"""
|
|
ensure_recordings_dir()
|
|
|
|
# Sanitize filename to prevent path traversal
|
|
safe_filename = os.path.basename(filename)
|
|
if not safe_filename.endswith('_captions.md'):
|
|
return False
|
|
|
|
filepath = os.path.join(RECORDINGS_DIR, safe_filename)
|
|
|
|
try:
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
logger.info(f"Recording deleted: {safe_filename}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete recording {safe_filename}: {e}")
|
|
|
|
return False
|