Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ npm run web
```bash
# ASR Services
cd extras/asr-services
docker compose up parakeet # Offline ASR with Parakeet
docker compose up parakeet-asr # Offline ASR with Parakeet

# Speaker Recognition (with tests)
cd extras/speaker-recognition
Expand All @@ -136,13 +136,6 @@ docker compose up --build

## Architecture Overview

### Core Structure
- **backends/advanced-backend/**: Primary FastAPI backend with real-time audio processing
- `src/main.py`: Central FastAPI application with WebSocket audio streaming
- `src/auth.py`: Email-based authentication with JWT tokens
- `src/memory/`: LLM-powered conversation memory system using mem0
- `webui/`: React-based web dashboard for conversation and user management

### Key Components
- **Audio Pipeline**: Real-time Opus/PCM → Application-level processing → Deepgram/Mistral transcription → memory extraction
- **Wyoming Protocol**: WebSocket communication uses Wyoming protocol (JSONL + binary) for structured audio sessions
Expand Down Expand Up @@ -1214,12 +1207,6 @@ curl http://[gpu-machine-ip]:8085/health # Speaker recognition

### Troubleshooting Distributed Setup

**Common Issues:**
- **CORS errors**: Tailscale IPs are automatically supported, but verify CORS_ORIGINS if using custom IPs
- **Service discovery**: Use `tailscale ip` to find machine IPs
- **Port conflicts**: Ensure services use different ports on shared machines
- **Authentication**: Services must be accessible without authentication for inter-service communication

**Debugging Commands:**
```bash
# Check Tailscale connectivity
Expand Down
67 changes: 67 additions & 0 deletions backends/advanced/src/advanced_omi_backend/audio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import logging
import os
import time
import wave
import io
import numpy as np
from pathlib import Path

# Type import to avoid circular imports
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -88,6 +92,69 @@ async def process_audio_chunk(
client_state.update_audio_received(chunk)


async def load_audio_file_as_chunk(audio_path: Path) -> AudioChunk:
"""Load existing audio file into Wyoming AudioChunk format for reprocessing.

Args:
audio_path: Path to the audio file on disk

Returns:
AudioChunk object ready for processing

Raises:
FileNotFoundError: If audio file doesn't exist
ValueError: If audio file format is invalid
"""
try:
# Read the audio file
with open(audio_path, 'rb') as f:
file_content = f.read()

# Process WAV file using existing pattern from system_controller.py
with wave.open(io.BytesIO(file_content), "rb") as wav_file:
sample_rate = wav_file.getframerate()
sample_width = wav_file.getsampwidth()
channels = wav_file.getnchannels()
audio_data = wav_file.readframes(wav_file.getnframes())

# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")

# Validate format matches expected (16kHz, mono, 16-bit)
if sample_rate != 16000:
raise ValueError(f"Audio file has sample rate {sample_rate}Hz, expected 16kHz")
if channels != 1:
raise ValueError(f"Audio file has {channels} channels, expected mono")
if sample_width != 2:
raise ValueError(f"Audio file has {sample_width}-byte samples, expected 2 bytes")

# Create AudioChunk with current timestamp
chunk = AudioChunk(
audio=audio_data,
rate=sample_rate,
width=sample_width,
channels=channels,
timestamp=int(time.time() * 1000)
)

logger.info(f"Loaded audio file {audio_path} as AudioChunk ({len(audio_data)} bytes)")
return chunk

except FileNotFoundError:
logger.error(f"Audio file not found: {audio_path}")
raise
except Exception as e:
logger.error(f"Error loading audio file {audio_path}: {e}")
raise ValueError(f"Invalid audio file format: {e}")


async def _process_audio_cropping_with_relative_timestamps(
original_path: str,
speech_segments: list[tuple[float, float]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
import asyncio
import hashlib
import logging
import os
import time
from pathlib import Path
from typing import Optional

from advanced_omi_backend.audio_utils import (
_process_audio_cropping_with_relative_timestamps,
load_audio_file_as_chunk,
)
from advanced_omi_backend.client_manager import (
ClientManager,
client_belongs_to_user,
get_user_clients_all,
)
from advanced_omi_backend.database import AudioChunksRepository, ProcessingRunsRepository, chunks_col, processing_runs_col, conversations_col, ConversationsRepository
from advanced_omi_backend.users import User
from advanced_omi_backend.processors import get_processor_manager, TranscriptionItem, MemoryProcessingItem
from advanced_omi_backend.users import User, get_user_by_id
from fastapi.responses import JSONResponse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -585,9 +588,10 @@ async def reprocess_transcript(conversation_id: str, user: User):
)

# Generate configuration hash for duplicate detection
transcription_provider = os.getenv("TRANSCRIPTION_PROVIDER", "deepgram")
config_data = {
"audio_path": str(full_audio_path),
"transcription_provider": "deepgram", # This would come from settings
"transcription_provider": transcription_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -613,18 +617,37 @@ async def reprocess_transcript(conversation_id: str, user: User):
status_code=500, content={"error": "Failed to create transcript version"}
)

# TODO: Queue audio for reprocessing with ProcessorManager
# This is where we would integrate with the existing processor
# For now, we'll return the version ID for the caller to handle
# NEW: Load audio file and queue for transcription processing
try:
# Load audio file as AudioChunk
audio_chunk = await load_audio_file_as_chunk(full_audio_path)

# Create TranscriptionItem for reprocessing
transcription_item = TranscriptionItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
audio_uuid=audio_uuid,
audio_chunk=audio_chunk
)

# Queue for transcription processing
processor_manager = get_processor_manager()
await processor_manager.queue_transcription(transcription_item)

logger.info(f"Queued transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

logger.info(f"Created transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
except Exception as e:
logger.error(f"Error queuing transcript reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Transcript reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down Expand Up @@ -673,9 +696,10 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
)

# Generate configuration hash for duplicate detection
memory_provider = os.getenv("MEMORY_PROVIDER", "friend_lite")
config_data = {
"transcript_version_id": transcript_version_id,
"memory_provider": "friend_lite", # This would come from settings
"memory_provider": memory_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -702,18 +726,42 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
status_code=500, content={"error": "Failed to create memory version"}
)

# TODO: Queue memory extraction for processing
# This is where we would integrate with the existing memory processor
# NEW: Queue memory processing
try:
# Get user email for memory processing
user_obj = await get_user_by_id(str(user.user_id))
if not user_obj:
return JSONResponse(
status_code=500, content={"error": "User not found for memory processing"}
)

# Create MemoryProcessingItem for reprocessing
memory_item = MemoryProcessingItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
user_email=user_obj.email,
conversation_id=conversation_id
)

# Queue for memory processing
processor_manager = get_processor_manager()
await processor_manager.queue_memory(memory_item)

logger.info(f"Created memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
logger.info(f"Queued memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

except Exception as e:
logger.error(f"Error queuing memory reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue memory reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Memory reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"transcript_version_id": transcript_version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,14 @@ async def list_processing_jobs():
async def process_files_with_content(
job_id: str, file_data: list[tuple[str, bytes]], user: User, device_name: str
):
"""Background task to process uploaded files using pre-read content."""
"""Background task to process uploaded files using pre-read content.

Creates persistent clients that remain active in an upload session,
following the same code path as WebSocket clients.
"""
# Import here to avoid circular imports
from advanced_omi_backend.main import cleanup_client_state, create_client_state
from advanced_omi_backend.main import create_client_state, cleanup_client_state
import uuid

audio_logger.info(
f"🚀 process_files_with_content called for job {job_id} with {len(file_data)} files"
Expand All @@ -536,8 +541,13 @@ async def process_files_with_content(
# Update job status to processing
await job_tracker.update_job_status(job_id, JobStatus.PROCESSING)

# Process files one by one
processed_files = []

for file_index, (filename, content) in enumerate(file_data):
client_id = None
# Generate client ID for this file
file_device_name = f"{device_name}-{file_index + 1:03d}"
client_id = generate_client_id(user, file_device_name)
client_state = None

try:
Expand Down Expand Up @@ -577,18 +587,22 @@ async def process_files_with_content(
)
continue

# Generate unique client ID for each file
# Use pre-generated client ID from upload session
file_device_name = f"{device_name}-{file_index + 1:03d}"
client_id = generate_client_id(user, file_device_name)

# Update job tracker with client ID
await job_tracker.update_file_status(
job_id, filename, FileStatus.PROCESSING, client_id=client_id
)

# Create client state
# Create persistent client state (will be tracked by ProcessorManager)
client_state = await create_client_state(client_id, user, file_device_name)


audio_logger.info(
f"👤 [Job {job_id}] Created persistent client {client_id} for file {filename}"
)

# Process WAV file
with wave.open(io.BytesIO(content), "rb") as wav_file:
sample_rate = wav_file.getframerate()
Expand Down Expand Up @@ -732,28 +746,31 @@ async def process_files_with_content(
job_id, filename, FileStatus.FAILED, error_message=error_msg
)
finally:
# Always clean up client state to prevent accumulation
# Clean up client state immediately after upload completes (like WebSocket disconnect)
# ProcessorManager will continue tracking processing independently
if client_id and client_state:
try:
await cleanup_client_state(client_id)
audio_logger.info(
f"🧹 [Job {job_id}] Cleaned up client state for {client_id}"
)
audio_logger.info(f"🧹 Cleaned up client state for {client_id}")
except Exception as cleanup_error:
audio_logger.error(
f"❌ [Job {job_id}] Error cleaning up client state for {client_id}: {cleanup_error}"
f"❌ Error cleaning up client state for {client_id}: {cleanup_error}"
)

# Mark job as completed
await job_tracker.update_job_status(job_id, JobStatus.COMPLETED)
audio_logger.info(f"🎉 [Job {job_id}] All files processed")

audio_logger.info(
f"🎉 [Job {job_id}] All files processed successfully."
)

except Exception as e:
error_msg = f"Job processing failed: {str(e)}"
audio_logger.error(f"💥 [Job {job_id}] {error_msg}")
await job_tracker.update_job_status(job_id, JobStatus.FAILED, error_msg)



# Configuration functions moved to config.py to avoid circular imports


Expand Down Expand Up @@ -1282,3 +1299,6 @@ async def get_client_processing_detail(client_id: str):
return JSONResponse(
status_code=500, content={"error": f"Failed to get client detail: {str(e)}"}
)



10 changes: 10 additions & 0 deletions backends/advanced/src/advanced_omi_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ async def cleanup_client_state(client_id: str):
removed = await client_manager.remove_client_with_cleanup(client_id)

if removed:
# Clean up processor manager task tracking
try:
processor_manager = get_processor_manager()
processor_manager.cleanup_processing_tasks(client_id)
logger.debug(f"Cleaned up processor tasks for client {client_id}")
except Exception as processor_cleanup_error:
logger.error(f"Error cleaning up processor tasks for {client_id}: {processor_cleanup_error}")

# Clean up any orphaned transcript events for this client
coordinator = get_transcript_coordinator()
coordinator.cleanup_transcript_events_for_client(client_id)
Expand Down Expand Up @@ -320,6 +328,7 @@ async def lifespan(app: FastAPI):
processor_manager = init_processor_manager(CHUNK_DIR, ac_repository)
await processor_manager.start()


logger.info("App ready")
try:
yield
Expand All @@ -331,6 +340,7 @@ async def lifespan(app: FastAPI):
for client_id in client_manager.get_all_client_ids():
await cleanup_client_state(client_id)


# Shutdown processor manager
processor_manager = get_processor_manager()
await processor_manager.shutdown()
Expand Down
Loading