Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backends/advanced/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ COPY diarization_config.json* ./


# Run the application
CMD ["uv", "run", "python3", "src/advanced_omi_backend/main.py"]
CMD ["uv", "run", "--extra", "deepgram", "python3", "src/advanced_omi_backend/main.py"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,95 @@
import asyncio
import logging
import os
import time

# Type import to avoid circular imports
from typing import TYPE_CHECKING, Optional

from wyoming.audio import AudioChunk

if TYPE_CHECKING:
from advanced_omi_backend.client import ClientState
from advanced_omi_backend.database import AudioChunksRepository

logger = logging.getLogger(__name__)

# Import constants from main.py (these are defined there)
MIN_SPEECH_SEGMENT_DURATION = float(os.getenv("MIN_SPEECH_SEGMENT_DURATION", "1.0")) # seconds
CROPPING_CONTEXT_PADDING = float(os.getenv("CROPPING_CONTEXT_PADDING", "0.1")) # seconds


async def process_audio_chunk(
audio_data: bytes,
client_id: str,
user_id: str,
user_email: str,
audio_format: dict,
client_state: Optional["ClientState"] = None
) -> None:
"""Process a single audio chunk through the standard pipeline.

This function encapsulates the common pattern used across all audio input sources:
1. Create AudioChunk with format details
2. Queue AudioProcessingItem to processor
3. Update client state if provided

Args:
audio_data: Raw audio bytes
client_id: Client identifier
user_id: User identifier
user_email: User email
audio_format: Dict containing {rate, width, channels, timestamp}
client_state: Optional ClientState for state updates
"""

from advanced_omi_backend.processors import (
AudioProcessingItem,
get_processor_manager,
)

# Extract format details
rate = audio_format.get("rate", 16000)
width = audio_format.get("width", 2)
channels = audio_format.get("channels", 1)
timestamp = audio_format.get("timestamp")

# Use current time if no timestamp provided
if timestamp is None:
timestamp = int(time.time() * 1000)

# Create AudioChunk with format details
chunk = AudioChunk(
audio=audio_data,
rate=rate,
width=width,
channels=channels,
timestamp=timestamp
)

# Create AudioProcessingItem and queue for processing
processor_manager = get_processor_manager()
processing_item = AudioProcessingItem(
client_id=client_id,
user_id=user_id,
user_email=user_email,
audio_chunk=chunk,
timestamp=timestamp
)

await processor_manager.queue_audio(processing_item)

# Update client state if provided
if client_state is not None:
client_state.update_audio_received(chunk)


async def _process_audio_cropping_with_relative_timestamps(
original_path: str,
speech_segments: list[tuple[float, float]],
output_path: str,
audio_uuid: str,
chunk_repo: Optional['AudioChunksRepository'] = None,
) -> bool:
"""
Process audio cropping with automatic relative timestamp conversion.
Expand Down Expand Up @@ -79,7 +159,8 @@ async def _process_audio_cropping_with_relative_timestamps(
if success:
# Update database with cropped file info (keep original absolute timestamps for reference)
cropped_filename = output_path.split("/")[-1]
await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
if chunk_repo is not None:
await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
logger.info(f"Successfully processed cropped audio: {cropped_filename}")
return True
else:
Expand Down
5 changes: 2 additions & 3 deletions backends/advanced/src/advanced_omi_backend/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from wyoming.audio import AudioChunk

from advanced_omi_backend.conversation_manager import get_conversation_manager
from advanced_omi_backend.database import AudioChunksRepository
from advanced_omi_backend.task_manager import get_task_manager
from wyoming.audio import AudioChunk

# Get loggers
audio_logger = logging.getLogger("audio_processing")
Expand Down Expand Up @@ -66,7 +65,7 @@ def __init__(
# Debug tracking
self.transaction_id: Optional[str] = None

audio_logger.info(f"Created simplified client state for {client_id}")
audio_logger.info(f"Created client state for {client_id}")

def update_audio_received(self, chunk: AudioChunk):
"""Update state when audio is received."""
Expand Down
154 changes: 123 additions & 31 deletions backends/advanced/src/advanced_omi_backend/client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ class ClientManager:
"""
Centralized manager for active client connections and client-user relationships.

This service provides thread-safe access to active client information
and client-user relationship management for use in API endpoints and other services.
This service provides atomic operations for client lifecycle management
and serves as the single source of truth for active client state.
"""

def __init__(self):
self._active_clients: Dict[str, "ClientState"] = {}
self._initialized = False
self._initialized = True # Self-initializing, no external dict needed
logger.info("ClientManager initialized as single source of truth")

def initialize(self, active_clients_dict: Dict[str, "ClientState"]):
def initialize(self, active_clients_dict: Optional[Dict[str, "ClientState"]] = None):
"""
Initialize the client manager with a reference to the active_clients dict.
Legacy initialization method for backward compatibility.

This should be called from main.py during startup to provide access
to the global active_clients dictionary.
New design: ClientManager is self-initializing and doesn't need external dict.
This method is kept for compatibility but does nothing.
"""
self._active_clients = active_clients_dict
self._initialized = True
logger.info("ClientManager initialized with active_clients reference")
if active_clients_dict is not None:
logger.warning("ClientManager no longer uses external dictionaries - ignoring active_clients_dict")
logger.info("ClientManager initialization (legacy compatibility mode)")

def is_initialized(self) -> bool:
"""Check if the client manager has been initialized."""
Expand All @@ -58,15 +59,7 @@ def get_client(self, client_id: str) -> Optional["ClientState"]:

Returns:
ClientState object if found, None if client not found

Raises:
RuntimeError: If ClientManager is not initialized
"""
if not self._initialized:
logger.error("ClientManager not initialized, cannot get client")
raise RuntimeError(
"ClientManager not initialized - call initialize() first before accessing clients"
)
return self._active_clients.get(client_id)

def has_client(self, client_id: str) -> bool:
Expand All @@ -79,9 +72,6 @@ def has_client(self, client_id: str) -> bool:
Returns:
True if client is active, False otherwise
"""
if not self._initialized:
logger.warning("ClientManager not initialized, cannot check client")
return False
return client_id in self._active_clients

def is_client_active(self, client_id: str) -> bool:
Expand All @@ -103,9 +93,6 @@ def get_all_clients(self) -> Dict[str, "ClientState"]:
Returns:
Dictionary of client_id -> ClientState mappings
"""
if not self._initialized:
logger.warning("ClientManager not initialized, returning empty dict")
return {}
return self._active_clients.copy()

def get_client_count(self) -> int:
Expand All @@ -115,22 +102,127 @@ def get_client_count(self) -> int:
Returns:
Number of active clients
"""
if not self._initialized:
logger.warning("ClientManager not initialized, returning 0")
return 0
return len(self._active_clients)

def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState":
"""
Atomically create and register a new client.

This method ensures that client creation and registration happen atomically,
eliminating race conditions.

Args:
client_id: Unique client identifier
ac_repository: Audio chunks repository
chunk_dir: Directory for audio chunks
user_id: User ID who owns this client
user_email: Optional user email

Returns:
Created ClientState object

Raises:
ValueError: If client_id already exists
"""
if client_id in self._active_clients:
raise ValueError(f"Client {client_id} already exists")

# Import here to avoid circular imports
from advanced_omi_backend.client import ClientState

# Create client state
client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email)

# Atomically add to internal storage and register mapping
self._active_clients[client_id] = client_state
register_client_user_mapping(client_id, user_id)

logger.info(f"✅ Created and registered client {client_id} for user {user_id}")
return client_state

def remove_client(self, client_id: str) -> bool:
"""
Atomically remove and deregister a client.

This method ensures that client removal and deregistration happen atomically.

Args:
client_id: Client identifier to remove

Returns:
True if client was removed, False if client didn't exist
"""
if client_id not in self._active_clients:
logger.warning(f"Attempted to remove non-existent client {client_id}")
return False

# Get client state for cleanup
client_state = self._active_clients[client_id]

# Atomically remove from storage and deregister mapping
del self._active_clients[client_id]
unregister_client_user_mapping(client_id)

logger.info(f"✅ Removed and deregistered client {client_id}")
return True

async def remove_client_with_cleanup(self, client_id: str) -> bool:
"""
Atomically remove client with full cleanup.

Args:
client_id: Client identifier to remove

Returns:
True if client was removed, False if client didn't exist
"""
if client_id not in self._active_clients:
logger.warning(f"Attempted to remove non-existent client {client_id}")
return False

# Get client state for cleanup
client_state = self._active_clients[client_id]

# Call client's disconnect method for proper cleanup
await client_state.disconnect()

# Atomically remove from storage and deregister mapping
del self._active_clients[client_id]
unregister_client_user_mapping(client_id)

logger.info(f"✅ Removed and cleaned up client {client_id}")
return True

def get_all_client_ids(self) -> list[str]:
"""
Get list of all active client IDs.

Returns:
List of active client IDs
"""
return list(self._active_clients.keys())

def add_existing_client(self, client_id: str, client_state: "ClientState"):
"""
Add an existing client state (for migration purposes).

Args:
client_id: Client identifier
client_state: Existing ClientState object
"""
if client_id in self._active_clients:
logger.warning(f"Overwriting existing client {client_id}")

self._active_clients[client_id] = client_state
logger.info(f"Added existing client {client_id} to ClientManager")

def get_client_info_summary(self) -> list:
"""
Get summary information about all active clients.

Returns:
List of client info dictionaries suitable for API responses
"""
if not self._initialized:
logger.warning("ClientManager not initialized, returning empty list")
return []

client_info = []
for client_id, client_state in self._active_clients.items():
current_audio_uuid = client_state.current_audio_uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from typing import Optional

from advanced_omi_backend.audio_cropping_utils import (
from advanced_omi_backend.audio_utils import (
_process_audio_cropping_with_relative_timestamps,
)
from advanced_omi_backend.client_manager import (
Expand Down Expand Up @@ -270,10 +270,29 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User):
}
)

# Get speech segments from the chunk
speech_segments = chunk.get("speech_segments", [])
if not speech_segments:
return JSONResponse(
status_code=400,
content={"error": "No speech segments found for this conversation"}
)

# Generate output path for cropped audio
cropped_filename = f"cropped_{audio_uuid}.wav"
output_path = Path("/app/data/audio_chunks") / cropped_filename

# Get repository for database updates
chunk_repo = AudioChunksRepository(chunks_col)

# Reprocess the audio cropping
try:
result = await asyncio.get_running_loop().run_in_executor(
None, _process_audio_cropping_with_relative_timestamps, str(full_audio_path), audio_uuid
result = await _process_audio_cropping_with_relative_timestamps(
str(full_audio_path),
speech_segments,
str(output_path),
audio_uuid,
chunk_repo
)

if result:
Expand Down
Loading