From 91d202b8bd9cb1b2c5b396831e08b447eca1a148 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Fri, 19 Sep 2025 01:32:57 +0000 Subject: [PATCH 1/4] Refactor audio processing and client management - Removed compute mode handling from service setup in `init.py`. - Introduced `audio_utils.py` for audio processing functions, including `process_audio_chunk` and audio cropping utilities. - Updated `ClientManager` to be self-initializing and added atomic client creation and removal methods. - Refactored audio chunk processing in various controllers to utilize the new unified audio processing pipeline. - Enhanced transcription status update functionality in `database.py` to manage active transcript versions. - Cleaned up imports and improved logging across multiple files for better clarity and maintainability. --- ...audio_cropping_utils.py => audio_utils.py} | 83 +++++++- .../src/advanced_omi_backend/client.py | 5 +- .../advanced_omi_backend/client_manager.py | 154 ++++++++++++--- .../controllers/conversation_controller.py | 25 ++- .../controllers/system_controller.py | 79 ++++---- .../src/advanced_omi_backend/database.py | 67 +++++++ .../advanced/src/advanced_omi_backend/main.py | 186 ++++++++---------- .../advanced_omi_backend/memory/__init__.py | 1 - .../memory/service_factory.py | 16 +- .../src/advanced_omi_backend/processors.py | 31 +-- .../src/advanced_omi_backend/transcription.py | 32 +-- init.py | 5 - 12 files changed, 445 insertions(+), 239 deletions(-) rename backends/advanced/src/advanced_omi_backend/{audio_cropping_utils.py => audio_utils.py} (73%) diff --git a/backends/advanced/src/advanced_omi_backend/audio_cropping_utils.py b/backends/advanced/src/advanced_omi_backend/audio_utils.py similarity index 73% rename from backends/advanced/src/advanced_omi_backend/audio_cropping_utils.py rename to backends/advanced/src/advanced_omi_backend/audio_utils.py index d1274903..2821d126 100644 --- a/backends/advanced/src/advanced_omi_backend/audio_cropping_utils.py +++ b/backends/advanced/src/advanced_omi_backend/audio_utils.py @@ -5,15 +5,95 @@ import asyncio import logging import os +import time + +# Type import to avoid circular imports +from typing import TYPE_CHECKING, Optional + +from wyoming.audio import AudioChunk + +if TYPE_CHECKING: + from advanced_omi_backend.client import ClientState + from advanced_omi_backend.database import AudioChunksRepository logger = logging.getLogger(__name__) +# Import constants from main.py (these are defined there) +MIN_SPEECH_SEGMENT_DURATION = float(os.getenv("MIN_SPEECH_SEGMENT_DURATION", "1.0")) # seconds +CROPPING_CONTEXT_PADDING = float(os.getenv("CROPPING_CONTEXT_PADDING", "0.1")) # seconds + + +async def process_audio_chunk( + audio_data: bytes, + client_id: str, + user_id: str, + user_email: str, + audio_format: dict, + client_state: Optional["ClientState"] = None +) -> None: + """Process a single audio chunk through the standard pipeline. + + This function encapsulates the common pattern used across all audio input sources: + 1. Create AudioChunk with format details + 2. Queue AudioProcessingItem to processor + 3. Update client state if provided + + Args: + audio_data: Raw audio bytes + client_id: Client identifier + user_id: User identifier + user_email: User email + audio_format: Dict containing {rate, width, channels, timestamp} + client_state: Optional ClientState for state updates + """ + + from advanced_omi_backend.processors import ( + AudioProcessingItem, + get_processor_manager, + ) + + # Extract format details + rate = audio_format.get("rate", 16000) + width = audio_format.get("width", 2) + channels = audio_format.get("channels", 1) + timestamp = audio_format.get("timestamp") + + # Use current time if no timestamp provided + if timestamp is None: + timestamp = int(time.time() * 1000) + + # Create AudioChunk with format details + chunk = AudioChunk( + audio=audio_data, + rate=rate, + width=width, + channels=channels, + timestamp=timestamp + ) + + # Create AudioProcessingItem and queue for processing + processor_manager = get_processor_manager() + processing_item = AudioProcessingItem( + client_id=client_id, + user_id=user_id, + user_email=user_email, + audio_chunk=chunk, + timestamp=timestamp + ) + + await processor_manager.queue_audio(processing_item) + + # Update client state if provided + if client_state is not None: + client_state.update_audio_received(chunk) + async def _process_audio_cropping_with_relative_timestamps( original_path: str, speech_segments: list[tuple[float, float]], output_path: str, audio_uuid: str, + chunk_repo: Optional['AudioChunksRepository'] = None, ) -> bool: """ Process audio cropping with automatic relative timestamp conversion. @@ -79,7 +159,8 @@ async def _process_audio_cropping_with_relative_timestamps( if success: # Update database with cropped file info (keep original absolute timestamps for reference) cropped_filename = output_path.split("/")[-1] - await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) + if chunk_repo is not None: + await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) logger.info(f"Successfully processed cropped audio: {cropped_filename}") return True else: diff --git a/backends/advanced/src/advanced_omi_backend/client.py b/backends/advanced/src/advanced_omi_backend/client.py index 660308dc..3c43a43a 100644 --- a/backends/advanced/src/advanced_omi_backend/client.py +++ b/backends/advanced/src/advanced_omi_backend/client.py @@ -12,11 +12,10 @@ from pathlib import Path from typing import Dict, List, Optional, Tuple -from wyoming.audio import AudioChunk - from advanced_omi_backend.conversation_manager import get_conversation_manager from advanced_omi_backend.database import AudioChunksRepository from advanced_omi_backend.task_manager import get_task_manager +from wyoming.audio import AudioChunk # Get loggers audio_logger = logging.getLogger("audio_processing") @@ -66,7 +65,7 @@ def __init__( # Debug tracking self.transaction_id: Optional[str] = None - audio_logger.info(f"Created simplified client state for {client_id}") + audio_logger.info(f"Created client state for {client_id}") def update_audio_received(self, chunk: AudioChunk): """Update state when audio is received.""" diff --git a/backends/advanced/src/advanced_omi_backend/client_manager.py b/backends/advanced/src/advanced_omi_backend/client_manager.py index 3b06e9c7..b48cd51c 100644 --- a/backends/advanced/src/advanced_omi_backend/client_manager.py +++ b/backends/advanced/src/advanced_omi_backend/client_manager.py @@ -26,24 +26,25 @@ class ClientManager: """ Centralized manager for active client connections and client-user relationships. - This service provides thread-safe access to active client information - and client-user relationship management for use in API endpoints and other services. + This service provides atomic operations for client lifecycle management + and serves as the single source of truth for active client state. """ def __init__(self): self._active_clients: Dict[str, "ClientState"] = {} - self._initialized = False + self._initialized = True # Self-initializing, no external dict needed + logger.info("ClientManager initialized as single source of truth") - def initialize(self, active_clients_dict: Dict[str, "ClientState"]): + def initialize(self, active_clients_dict: Optional[Dict[str, "ClientState"]] = None): """ - Initialize the client manager with a reference to the active_clients dict. + Legacy initialization method for backward compatibility. - This should be called from main.py during startup to provide access - to the global active_clients dictionary. + New design: ClientManager is self-initializing and doesn't need external dict. + This method is kept for compatibility but does nothing. """ - self._active_clients = active_clients_dict - self._initialized = True - logger.info("ClientManager initialized with active_clients reference") + if active_clients_dict is not None: + logger.warning("ClientManager no longer uses external dictionaries - ignoring active_clients_dict") + logger.info("ClientManager initialization (legacy compatibility mode)") def is_initialized(self) -> bool: """Check if the client manager has been initialized.""" @@ -58,15 +59,7 @@ def get_client(self, client_id: str) -> Optional["ClientState"]: Returns: ClientState object if found, None if client not found - - Raises: - RuntimeError: If ClientManager is not initialized """ - if not self._initialized: - logger.error("ClientManager not initialized, cannot get client") - raise RuntimeError( - "ClientManager not initialized - call initialize() first before accessing clients" - ) return self._active_clients.get(client_id) def has_client(self, client_id: str) -> bool: @@ -79,9 +72,6 @@ def has_client(self, client_id: str) -> bool: Returns: True if client is active, False otherwise """ - if not self._initialized: - logger.warning("ClientManager not initialized, cannot check client") - return False return client_id in self._active_clients def is_client_active(self, client_id: str) -> bool: @@ -103,9 +93,6 @@ def get_all_clients(self) -> Dict[str, "ClientState"]: Returns: Dictionary of client_id -> ClientState mappings """ - if not self._initialized: - logger.warning("ClientManager not initialized, returning empty dict") - return {} return self._active_clients.copy() def get_client_count(self) -> int: @@ -115,11 +102,120 @@ def get_client_count(self) -> int: Returns: Number of active clients """ - if not self._initialized: - logger.warning("ClientManager not initialized, returning 0") - return 0 return len(self._active_clients) + def create_client(self, client_id: str, ac_repository, chunk_dir, user_id: str, user_email: Optional[str] = None) -> "ClientState": + """ + Atomically create and register a new client. + + This method ensures that client creation and registration happen atomically, + eliminating race conditions. + + Args: + client_id: Unique client identifier + ac_repository: Audio chunks repository + chunk_dir: Directory for audio chunks + user_id: User ID who owns this client + user_email: Optional user email + + Returns: + Created ClientState object + + Raises: + ValueError: If client_id already exists + """ + if client_id in self._active_clients: + raise ValueError(f"Client {client_id} already exists") + + # Import here to avoid circular imports + from advanced_omi_backend.client import ClientState + + # Create client state + client_state = ClientState(client_id, ac_repository, chunk_dir, user_id, user_email) + + # Atomically add to internal storage and register mapping + self._active_clients[client_id] = client_state + register_client_user_mapping(client_id, user_id) + + logger.info(f"✅ Created and registered client {client_id} for user {user_id}") + return client_state + + def remove_client(self, client_id: str) -> bool: + """ + Atomically remove and deregister a client. + + This method ensures that client removal and deregistration happen atomically. + + Args: + client_id: Client identifier to remove + + Returns: + True if client was removed, False if client didn't exist + """ + if client_id not in self._active_clients: + logger.warning(f"Attempted to remove non-existent client {client_id}") + return False + + # Get client state for cleanup + client_state = self._active_clients[client_id] + + # Atomically remove from storage and deregister mapping + del self._active_clients[client_id] + unregister_client_user_mapping(client_id) + + logger.info(f"✅ Removed and deregistered client {client_id}") + return True + + async def remove_client_with_cleanup(self, client_id: str) -> bool: + """ + Atomically remove client with full cleanup. + + Args: + client_id: Client identifier to remove + + Returns: + True if client was removed, False if client didn't exist + """ + if client_id not in self._active_clients: + logger.warning(f"Attempted to remove non-existent client {client_id}") + return False + + # Get client state for cleanup + client_state = self._active_clients[client_id] + + # Call client's disconnect method for proper cleanup + await client_state.disconnect() + + # Atomically remove from storage and deregister mapping + del self._active_clients[client_id] + unregister_client_user_mapping(client_id) + + logger.info(f"✅ Removed and cleaned up client {client_id}") + return True + + def get_all_client_ids(self) -> list[str]: + """ + Get list of all active client IDs. + + Returns: + List of active client IDs + """ + return list(self._active_clients.keys()) + + def add_existing_client(self, client_id: str, client_state: "ClientState"): + """ + Add an existing client state (for migration purposes). + + Args: + client_id: Client identifier + client_state: Existing ClientState object + """ + if client_id in self._active_clients: + logger.warning(f"Overwriting existing client {client_id}") + + self._active_clients[client_id] = client_state + logger.info(f"Added existing client {client_id} to ClientManager") + def get_client_info_summary(self) -> list: """ Get summary information about all active clients. @@ -127,10 +223,6 @@ def get_client_info_summary(self) -> list: Returns: List of client info dictionaries suitable for API responses """ - if not self._initialized: - logger.warning("ClientManager not initialized, returning empty list") - return [] - client_info = [] for client_id, client_state in self._active_clients.items(): current_audio_uuid = client_state.current_audio_uuid diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 0db39a98..e53eef88 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -9,7 +9,7 @@ from pathlib import Path from typing import Optional -from advanced_omi_backend.audio_cropping_utils import ( +from advanced_omi_backend.audio_utils import ( _process_audio_cropping_with_relative_timestamps, ) from advanced_omi_backend.client_manager import ( @@ -270,10 +270,29 @@ async def reprocess_audio_cropping(audio_uuid: str, user: User): } ) + # Get speech segments from the chunk + speech_segments = chunk.get("speech_segments", []) + if not speech_segments: + return JSONResponse( + status_code=400, + content={"error": "No speech segments found for this conversation"} + ) + + # Generate output path for cropped audio + cropped_filename = f"cropped_{audio_uuid}.wav" + output_path = Path("/app/data/audio_chunks") / cropped_filename + + # Get repository for database updates + chunk_repo = AudioChunksRepository(chunks_col) + # Reprocess the audio cropping try: - result = await asyncio.get_running_loop().run_in_executor( - None, _process_audio_cropping_with_relative_timestamps, str(full_audio_path), audio_uuid + result = await _process_audio_cropping_with_relative_timestamps( + str(full_audio_path), + speech_segments, + str(output_path), + audio_uuid, + chunk_repo ) if result: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index eb1582d1..9fc7efe6 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -14,17 +14,20 @@ from pathlib import Path import numpy as np -from fastapi import BackgroundTasks, File, Query, UploadFile -from fastapi.responses import JSONResponse -from wyoming.audio import AudioChunk - from advanced_omi_backend.client_manager import generate_client_id -from advanced_omi_backend.config import load_diarization_settings_from_file, save_diarization_settings_to_file +from advanced_omi_backend.config import ( + load_diarization_settings_from_file, + save_diarization_settings_to_file, +) from advanced_omi_backend.database import chunks_col from advanced_omi_backend.job_tracker import FileStatus, JobStatus, get_job_tracker from advanced_omi_backend.processors import AudioProcessingItem, get_processor_manager +from advanced_omi_backend.audio_utils import process_audio_chunk from advanced_omi_backend.task_manager import get_task_manager from advanced_omi_backend.users import User +from fastapi import BackgroundTasks, File, Query, UploadFile +from fastapi.responses import JSONResponse +from wyoming.audio import AudioChunk logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -232,8 +235,9 @@ async def process_audio_files( # Ensure sample rate is 16kHz (resample if needed) if sample_rate != 16000: audio_logger.warning( - f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. Processing anyway." + f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz." ) + raise JSONResponse(status_code=400, content={"error": f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry"}) # Process audio in larger chunks for faster file processing # Use larger chunks (32KB) for optimal performance @@ -250,25 +254,20 @@ async def process_audio_files( ) chunk_timestamp = base_timestamp + int(chunk_offset_seconds) - # Create AudioChunk - chunk = AudioChunk( - audio=chunk_data, - rate=sample_rate, - width=sample_width, - channels=channels, - timestamp=chunk_timestamp, - ) - - # Add to application-level processing queue - - audio_item = AudioProcessingItem( + # Process audio chunk through unified pipeline + await process_audio_chunk( + audio_data=chunk_data, client_id=client_id, user_id=user.user_id, user_email=user.email, - audio_chunk=chunk, - timestamp=chunk.timestamp, + audio_format={ + "rate": sample_rate, + "width": sample_width, + "channels": channels, + "timestamp": chunk_timestamp, + }, + client_state=None, # No client state needed for file upload ) - await processor_manager.queue_audio(audio_item) # Yield control occasionally to prevent blocking the event loop if i % (chunk_size * 10) == 0: # Every 10 chunks (~320KB) @@ -622,22 +621,20 @@ async def process_files_with_content( ) chunk_timestamp = base_timestamp + int(chunk_offset_seconds) - chunk = AudioChunk( - audio=chunk_data, - rate=sample_rate, - width=sample_width, - channels=channels, - timestamp=chunk_timestamp, - ) - - audio_item = AudioProcessingItem( + # Process audio chunk through unified pipeline + await process_audio_chunk( + audio_data=chunk_data, client_id=client_id, user_id=user.user_id, user_email=user.email, - audio_chunk=chunk, - timestamp=chunk.timestamp, + audio_format={ + "rate": sample_rate, + "width": sample_width, + "channels": channels, + "timestamp": chunk_timestamp, + }, + client_state=None, # No client state needed for file upload ) - await processor_manager.queue_audio(audio_item) if i % (chunk_size * 10) == 0: # Yield control occasionally await asyncio.sleep(0) @@ -897,8 +894,10 @@ async def update_speaker_configuration(user: User, primary_speakers: list[dict]) async def get_enrolled_speakers(user: User): """Get enrolled speakers from speaker recognition service.""" try: - from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient - + from advanced_omi_backend.speaker_recognition_client import ( + SpeakerRecognitionClient, + ) + # Initialize speaker recognition client speaker_client = SpeakerRecognitionClient() @@ -933,8 +932,10 @@ async def get_enrolled_speakers(user: User): async def get_speaker_service_status(): """Check speaker recognition service health status.""" try: - from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient - + from advanced_omi_backend.speaker_recognition_client import ( + SpeakerRecognitionClient, + ) + # Initialize speaker recognition client speaker_client = SpeakerRecognitionClient() @@ -1012,7 +1013,7 @@ async def update_memory_config_raw(config_yaml: str): try: import yaml from advanced_omi_backend.memory_config_loader import get_config_loader - + # First validate YAML syntax try: yaml.safe_load(config_yaml) @@ -1062,7 +1063,7 @@ async def validate_memory_config(config_yaml: str): try: import yaml from advanced_omi_backend.memory_config_loader import MemoryConfigLoader - + # Parse YAML try: parsed_config = yaml.safe_load(config_yaml) diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index 39f3b8a1..f6cd691b 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -386,6 +386,73 @@ async def update_memory_processing_status( logger.info(f"Updated memory processing status to {status} for {audio_uuid}") return result.modified_count > 0 + async def update_transcription_status( + self, audio_uuid: str, status: str, error_message: str = None, provider: str = None + ): + """Update transcription processing status and completion timestamp. + + Interface compatibility method - updates active transcript version. + """ + chunk = await self.get_chunk(audio_uuid) + if not chunk: + return False + + active_version = chunk.get("active_transcript_version") + if not active_version: + # Create initial transcript version if none exists + version_id = str(uuid.uuid4()) + version_data = { + "version_id": version_id, + "transcript": "", + "segments": [], + "status": status, + "provider": provider, + "created_at": datetime.now(UTC).isoformat(), + "processing_run_id": None, + "raw_data": {}, + "speakers_identified": [] + } + if error_message: + version_data["error_message"] = error_message + + result = await self.col.update_one( + {"audio_uuid": audio_uuid}, + { + "$push": {"transcript_versions": version_data}, + "$set": { + "active_transcript_version": version_id, + "transcription_status": status, + "transcription_updated_at": datetime.now(UTC).isoformat(), + } + } + ) + else: + # Update existing active version + update_doc = { + f"transcript_versions.$[version].status": status, + f"transcript_versions.$[version].updated_at": datetime.now(UTC), + "transcription_status": status, + "transcription_updated_at": datetime.now(UTC).isoformat(), + } + if status == "COMPLETED": + update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() + if error_message: + update_doc[f"transcript_versions.$[version].error_message"] = error_message + update_doc["transcription_error"] = error_message + if provider: + update_doc[f"transcript_versions.$[version].provider"] = provider + update_doc["transcript_provider"] = provider + + result = await self.col.update_one( + {"audio_uuid": audio_uuid}, + {"$set": update_doc}, + array_filters=[{"version.version_id": active_version}] + ) + + if result.modified_count > 0: + logger.info(f"Updated transcription status to {status} for {audio_uuid}") + return result.modified_count > 0 + # ======================================== # SPEECH-DRIVEN CONVERSATIONS METHODS # ======================================== diff --git a/backends/advanced/src/advanced_omi_backend/main.py b/backends/advanced/src/advanced_omi_backend/main.py index ac597f83..de180d53 100644 --- a/backends/advanced/src/advanced_omi_backend/main.py +++ b/backends/advanced/src/advanced_omi_backend/main.py @@ -25,19 +25,6 @@ import aiohttp -# Import Beanie for user management -from beanie import init_beanie -from dotenv import load_dotenv -from fastapi import FastAPI, HTTPException, Query, Request, WebSocket, WebSocketDisconnect -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -from fastapi.staticfiles import StaticFiles -from friend_lite.decoder import OmiOpusDecoder -from motor.motor_asyncio import AsyncIOMotorClient -from pymongo.errors import ConnectionFailure, PyMongoError -from wyoming.audio import AudioChunk -from wyoming.client import AsyncTcpClient - # Import authentication components from advanced_omi_backend.auth import ( bearer_backend, @@ -55,16 +42,14 @@ ) from advanced_omi_backend.database import AudioChunksRepository from advanced_omi_backend.llm_client import async_health_check -from advanced_omi_backend.memory import ( - get_memory_service, - shutdown_memory_service, -) +from advanced_omi_backend.memory import get_memory_service, shutdown_memory_service from advanced_omi_backend.processors import ( AudioProcessingItem, get_processor_manager, init_processor_manager, ) -from advanced_omi_backend.task_manager import init_task_manager +from advanced_omi_backend.audio_utils import process_audio_chunk +from advanced_omi_backend.task_manager import init_task_manager, get_task_manager from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator from advanced_omi_backend.transcription_providers import get_transcription_provider from advanced_omi_backend.users import ( @@ -74,6 +59,26 @@ register_client_to_user, ) +# Import Beanie for user management +from beanie import init_beanie +from dotenv import load_dotenv +from fastapi import ( + FastAPI, + HTTPException, + Query, + Request, + WebSocket, + WebSocketDisconnect, +) +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi.staticfiles import StaticFiles +from friend_lite.decoder import OmiOpusDecoder +from motor.motor_asyncio import AsyncIOMotorClient +from pymongo.errors import ConnectionFailure, PyMongoError +from wyoming.audio import AudioChunk +from wyoming.client import AsyncTcpClient + ############################################################################### # SETUP ############################################################################### @@ -214,15 +219,14 @@ async def parse_wyoming_protocol(ws: WebSocket) -> tuple[dict, Optional[bytes]]: # Initialize repository and global state ac_repository = AudioChunksRepository(chunks_col) -active_clients: dict[str, ClientState] = {} # Client-to-user mapping for reliable permission checking client_to_user_mapping: dict[str, str] = {} # client_id -> user_id -# Initialize client manager with active_clients reference -from advanced_omi_backend.client_manager import init_client_manager +# Initialize client manager (self-initializing, no external dict needed) +from advanced_omi_backend.client_manager import get_client_manager -init_client_manager(active_clients) +client_manager = get_client_manager() # Initialize client utilities with the mapping dictionaries from advanced_omi_backend.client_manager import ( @@ -247,11 +251,10 @@ async def create_client_state( client_id: str, user: User, device_name: Optional[str] = None ) -> ClientState: """Create and register a new client state.""" - client_state = ClientState(client_id, ac_repository, CHUNK_DIR, user.user_id, user.email) - active_clients[client_id] = client_state - - # Register client-user mapping (for active clients) - register_client_user_mapping(client_id, user.user_id) + # Use ClientManager for atomic client creation and registration + client_state = client_manager.create_client( + client_id, ac_repository, CHUNK_DIR, user.user_id, user.email + ) # Also track in persistent mapping (for database queries) track_client_user_relationship(client_id, user.user_id) @@ -266,17 +269,17 @@ async def create_client_state( async def cleanup_client_state(client_id: str): """Clean up and remove client state.""" - if client_id in active_clients: - client_state = active_clients[client_id] - await client_state.disconnect() - del active_clients[client_id] + # Use ClientManager for atomic client removal with cleanup + removed = await client_manager.remove_client_with_cleanup(client_id) - # Clean up any orphaned transcript events for this client - coordinator = get_transcript_coordinator() - coordinator.cleanup_transcript_events_for_client(client_id) + if removed: + # Clean up any orphaned transcript events for this client + coordinator = get_transcript_coordinator() + coordinator.cleanup_transcript_events_for_client(client_id) - # Unregister client-user mapping - unregister_client_user_mapping(client_id) + logger.info(f"Client {client_id} cleaned up successfully") + else: + logger.warning(f"Client {client_id} was not found for cleanup") ############################################################################### @@ -316,17 +319,8 @@ async def lifespan(app: FastAPI): # Initialize processor manager processor_manager = init_processor_manager(CHUNK_DIR, ac_repository) await processor_manager.start() - application_logger.info("Application-level processors started") - - # Skip memory service pre-initialization to avoid blocking FastAPI startup - # Memory service will be lazily initialized when first used - application_logger.info("Memory service will be initialized on first use (lazy loading)") - - # SystemTracker is used for monitoring and debugging - application_logger.info("Using SystemTracker for monitoring and debugging") - - application_logger.info("Application ready - using application-level processing architecture.") + logger.info("App ready") try: yield finally: @@ -334,7 +328,7 @@ async def lifespan(app: FastAPI): application_logger.info("Shutting down application...") # Clean up all active clients - for client_id in list(active_clients.keys()): + for client_id in client_manager.get_all_client_ids(): await cleanup_client_state(client_id) # Shutdown processor manager @@ -343,9 +337,9 @@ async def lifespan(app: FastAPI): application_logger.info("Processor manager shut down") # Shutdown task manager - # task_manager = get_task_manager() - # await task_manager.shutdown() - # application_logger.info("Task manager shut down") + task_manager = get_task_manager() + await task_manager.shutdown() + application_logger.info("Task manager shut down") # Stop metrics collection and save final report application_logger.info("Metrics collection stopped") @@ -585,31 +579,26 @@ async def ws_endpoint_omi( audio_data = header.get("data", {}) chunk_timestamp = audio_data.get("timestamp", int(time.time())) - chunk = AudioChunk( - audio=pcm_data, - rate=OMI_SAMPLE_RATE, - width=OMI_SAMPLE_WIDTH, - channels=OMI_CHANNELS, - timestamp=chunk_timestamp, - ) - # Queue to application-level processor if packet_count <= 5 or packet_count % 100 == 0: # Log first 5 and every 100th application_logger.info( f"🚀 About to queue audio chunk #{packet_count} for client {client_id}" ) - await processor_manager.queue_audio( - AudioProcessingItem( - client_id=client_id, - user_id=user.user_id, - user_email=user.email, - audio_chunk=chunk, - timestamp=chunk.timestamp, - ) - ) - # Update client state for tracking purposes - client_state.update_audio_received(chunk) + # Process audio chunk through unified pipeline + await process_audio_chunk( + audio_data=pcm_data, + client_id=client_id, + user_id=user.user_id, + user_email=user.email, + audio_format={ + "rate": OMI_SAMPLE_RATE, + "width": OMI_SAMPLE_WIDTH, + "channels": OMI_CHANNELS, + "timestamp": chunk_timestamp, + }, + client_state=client_state, + ) # Log every 1000th packet to avoid spam if packet_count % 1000 == 0: @@ -829,25 +818,15 @@ async def ws_endpoint_pcm( application_logger.debug(f"🎵 Received audio chunk #{packet_count}: {len(audio_data)} bytes") - # Process audio chunk + # Process audio chunk through unified pipeline audio_format = control_header.get("data", {}) - chunk = AudioChunk( - audio=audio_data, - rate=audio_format.get("rate", 16000), - width=audio_format.get("width", 2), - channels=audio_format.get("channels", 1), - timestamp=audio_format.get("timestamp", int(time.time())), - ) - - # Send to audio processing pipeline - await processor_manager.queue_audio( - AudioProcessingItem( - client_id=client_id, - user_id=user.user_id, - user_email=user.email, - audio_chunk=chunk, - timestamp=chunk.timestamp, - ) + await process_audio_chunk( + audio_data=audio_data, + client_id=client_id, + user_id=user.user_id, + user_email=user.email, + audio_format=audio_format, + client_state=None, # No client state update needed for Wyoming protocol ) else: application_logger.warning(f"Expected binary payload for audio-chunk, got: {payload_msg.keys()}") @@ -870,24 +849,19 @@ async def ws_endpoint_pcm( application_logger.debug(f"🎵 Received raw audio chunk #{packet_count}: {len(audio_data)} bytes") - # Process raw audio chunk (assume PCM 16kHz mono) - chunk = AudioChunk( - audio=audio_data, - rate=16000, - width=2, - channels=1, - timestamp=int(time.time()), - ) - - # Send to audio processing pipeline - await processor_manager.queue_audio( - AudioProcessingItem( - client_id=client_id, - user_id=user.user_id, - user_email=user.email, - audio_chunk=chunk, - timestamp=chunk.timestamp, - ) + # Process raw audio chunk through unified pipeline (assume PCM 16kHz mono) + await process_audio_chunk( + audio_data=audio_data, + client_id=client_id, + user_id=user.user_id, + user_email=user.email, + audio_format={ + "rate": 16000, + "width": 2, + "channels": 1, + "timestamp": int(time.time()), + }, + client_state=None, # No client state update needed for raw streaming ) else: @@ -996,7 +970,7 @@ async def health_check(): transcription_provider.mode if transcription_provider else "none" ), "chunk_dir": str(CHUNK_DIR), - "active_clients": len(active_clients), + "active_clients": client_manager.get_client_count(), "new_conversation_timeout_minutes": NEW_CONVERSATION_TIMEOUT_MINUTES, "audio_cropping_enabled": AUDIO_CROPPING_ENABLED, "llm_provider": os.getenv("LLM_PROVIDER"), diff --git a/backends/advanced/src/advanced_omi_backend/memory/__init__.py b/backends/advanced/src/advanced_omi_backend/memory/__init__.py index 8fc4b103..1fcc786a 100644 --- a/backends/advanced/src/advanced_omi_backend/memory/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/memory/__init__.py @@ -37,7 +37,6 @@ # Also import core implementation for direct access from .memory_service import MemoryService as CoreMemoryService test_new_memory_service = None # Will be implemented if needed - memory_logger.info("✅ Successfully imported new memory service") except ImportError as e: memory_logger.error(f"Failed to import new memory service: {e}") raise diff --git a/backends/advanced/src/advanced_omi_backend/memory/service_factory.py b/backends/advanced/src/advanced_omi_backend/memory/service_factory.py index 48e2f6e0..c18bdccc 100644 --- a/backends/advanced/src/advanced_omi_backend/memory/service_factory.py +++ b/backends/advanced/src/advanced_omi_backend/memory/service_factory.py @@ -76,14 +76,9 @@ def get_memory_service() -> MemoryServiceBase: # Create appropriate service implementation _memory_service = create_memory_service(config) - # Initialize in background if possible - try: - loop = asyncio.get_event_loop() - if hasattr(_memory_service, '_initialized') and not _memory_service._initialized: - loop.create_task(_memory_service.initialize()) - except RuntimeError: - # No event loop running, will initialize on first use - pass + # Don't initialize here - let it happen lazily on first use + # This prevents orphaned tasks that cause "Task was destroyed but it is pending" errors + memory_logger.debug(f"Memory service created but not initialized: {type(_memory_service).__name__}") memory_logger.info(f"✅ Global memory service created: {type(_memory_service).__name__}") @@ -134,8 +129,9 @@ def get_service_info() -> dict: if _memory_service is not None: info["service_type"] = type(_memory_service).__name__ - info["service_initialized"] = getattr(_memory_service, "_initialized", False) - + # All memory services should have _initialized attribute per the base class + info["service_initialized"] = _memory_service._initialized + # Try to determine provider from service type if "OpenMemoryMCP" in info["service_type"]: info["memory_provider"] = "openmemory_mcp" diff --git a/backends/advanced/src/advanced_omi_backend/processors.py b/backends/advanced/src/advanced_omi_backend/processors.py index 61e1712c..386a671c 100644 --- a/backends/advanced/src/advanced_omi_backend/processors.py +++ b/backends/advanced/src/advanced_omi_backend/processors.py @@ -16,7 +16,7 @@ # Import TranscriptionManager for type hints from typing import TYPE_CHECKING, Any, Optional -from advanced_omi_backend.audio_cropping_utils import ( +from advanced_omi_backend.audio_utils import ( _process_audio_cropping_with_relative_timestamps, ) from advanced_omi_backend.client_manager import get_client_manager @@ -150,8 +150,6 @@ async def _update_memory_status(self, conversation_id: str, status: str): async def start(self): """Start all processors.""" - logger.info("Starting application-level processors...") - # Create processor tasks self.audio_processor_task = asyncio.create_task( self._audio_processor(), name="audio_processor" @@ -180,8 +178,6 @@ async def start(self): self.cropping_processor_task, "cropping_processor", {"type": "processor"} ) - logger.info("All processors started successfully") - async def _should_process_memory(self, user_id: str, conversation_id: str) -> tuple[bool, str]: """ Determine if memory processing should proceed based on primary speakers configuration. @@ -508,27 +504,12 @@ async def close_client_audio(self, client_id: str): f"📊 Transcription manager state - has manager: {manager is not None}, type: {type(manager).__name__}" ) - # Get audio duration for flush timeout calculation - audio_duration = None - if client_id in self.active_audio_uuids: - audio_uuid = self.active_audio_uuids[client_id] - audio_logger.info(f"📌 Active audio UUID for flush: {audio_uuid}") - # Try to estimate duration from file sink if available - if client_id in self.active_file_sinks: - try: - sink = self.active_file_sinks[client_id] - # Estimate duration based on samples written (if accessible) - # For now, use None and let flush_final_transcript handle timeout - audio_logger.info(f"📁 File sink exists for client {client_id}") - except Exception as e: - audio_logger.warning(f"⚠️ Error accessing file sink: {e}") - flush_start_time = time.time() audio_logger.info( f"📤 Calling flush_final_transcript for client {client_id} (manager: {manager})" ) try: - await manager.process_collected_audio(audio_duration) + await manager.process_collected_audio() flush_duration = time.time() - flush_start_time audio_logger.info( f"✅ ASR flush completed for client {client_id} in {flush_duration:.2f}s" @@ -907,7 +888,7 @@ async def _memory_processor(self): task_name, { "client_id": item.client_id, - "audio_uuid": item.audio_uuid, + "conversation_id": item.conversation_id, "type": "memory", "timeout": 3600, # 60 minutes }, @@ -918,12 +899,12 @@ async def _memory_processor(self): item.client_id, "memory", actual_task_id, - {"audio_uuid": item.audio_uuid}, + {"conversation_id": item.conversation_id}, ) except Exception as e: audio_logger.error( - f"Error queuing memory processing for {item.audio_uuid}: {e}", + f"Error queuing memory processing for {item.conversation_id}: {e}", exc_info=True, ) finally: @@ -1017,7 +998,6 @@ async def _process_memory_item(self, item: MemoryProcessingItem): audio_logger.info(f"✅ Memory service initialized for conversation {item.conversation_id}") # Process memory with timeout - audio_logger.info(f"🔥 About to call add_memory() for conversation {item.conversation_id}...") memory_result = await asyncio.wait_for( self.memory_service.add_memory( full_conversation, @@ -1192,6 +1172,7 @@ async def _cropping_processor(self): item.speech_segments, item.output_path, item.audio_uuid, + self.repository, ) ) diff --git a/backends/advanced/src/advanced_omi_backend/transcription.py b/backends/advanced/src/advanced_omi_backend/transcription.py index 2a3f876a..1cd5baa6 100644 --- a/backends/advanced/src/advanced_omi_backend/transcription.py +++ b/backends/advanced/src/advanced_omi_backend/transcription.py @@ -6,19 +6,26 @@ from datetime import UTC, datetime from typing import Optional -from wyoming.audio import AudioChunk - from advanced_omi_backend.client_manager import get_client_manager -from advanced_omi_backend.config import get_speech_detection_settings, get_conversation_stop_settings, load_diarization_settings_from_file -from advanced_omi_backend.database import conversations_col, ConversationsRepository +from advanced_omi_backend.config import ( + get_conversation_stop_settings, + get_speech_detection_settings, + load_diarization_settings_from_file, +) +from advanced_omi_backend.database import ConversationsRepository, conversations_col from advanced_omi_backend.llm_client import async_generate -from advanced_omi_backend.processors import AudioCroppingItem, MemoryProcessingItem, get_processor_manager +from advanced_omi_backend.processors import ( + AudioCroppingItem, + MemoryProcessingItem, + get_processor_manager, +) from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator from advanced_omi_backend.transcription_providers import ( BaseTranscriptionProvider, get_transcription_provider, ) +from wyoming.audio import AudioChunk # ASR Configuration TRANSCRIPTION_PROVIDER = os.getenv("TRANSCRIPTION_PROVIDER") # Optional: 'deepgram' or 'parakeet' @@ -151,10 +158,6 @@ def _get_current_client(self): return None return self.client_manager.get_client(self._client_id) - # REMOVED: Memory processing is now handled exclusively by conversation closure - # to prevent duplicate processing. The _queue_memory_processing method has been - # removed as part of the fix for double memory generation issue. - async def connect(self, client_id: str | None = None): """Initialize transcription service for the client.""" self._client_id = client_id @@ -171,7 +174,7 @@ async def connect(self, client_id: str | None = None): logger.error(f"Failed to connect to {self.provider.name} transcription service: {e}") raise - async def process_collected_audio(self, audio_duration_seconds: Optional[float] = None): + async def process_collected_audio(self): """Unified processing for all transcription providers.""" logger.info(f"🚀 process_collected_audio called for client {self._client_id}") logger.info( @@ -195,7 +198,7 @@ async def process_collected_audio(self, audio_duration_seconds: Optional[float] # Get transcript from provider try: - transcript_result = await self._get_transcript(audio_duration_seconds) + transcript_result = await self._get_transcript() # Process the result uniformly await self._process_transcript_result(transcript_result) except asyncio.CancelledError: @@ -213,7 +216,7 @@ async def process_collected_audio(self, audio_duration_seconds: Optional[float] coordinator = get_transcript_coordinator() coordinator.signal_transcript_failed(self._current_audio_uuid, str(e)) - async def _get_transcript(self, audio_duration_seconds: Optional[float] = None): + async def _get_transcript(self): """Get transcript from any provider using unified interface.""" if not self.provider: logger.error("No transcription provider available") @@ -281,13 +284,12 @@ async def _process_transcript_result(self, transcript_result): try: # Store raw transcript data provider_name = self.provider.name if self.provider else "unknown" - logger.info(f"🔍 DEBUG: transcript_result type={type(transcript_result)}, content preview: {str(transcript_result)[:200]}") + logger.info(f"transcript_result type={type(transcript_result)}, content preview: {str(transcript_result)[:200]}") if self.chunk_repo: - logger.info(f"🔍 DEBUG: About to store raw transcript data for {self._current_audio_uuid}") await self.chunk_repo.store_raw_transcript_data( self._current_audio_uuid, transcript_result, provider_name ) - logger.info(f"🔍 DEBUG: Successfully stored raw transcript data for {self._current_audio_uuid}") + logger.info(f"Successfully stored raw transcript data for {self._current_audio_uuid}") # Normalize transcript result normalized_result = self._normalize_transcript_result(transcript_result) diff --git a/init.py b/init.py index d5c056a6..ee28cb5e 100755 --- a/init.py +++ b/init.py @@ -174,11 +174,6 @@ def run_service_setup(service_name, selected_services, https_enabled=False, serv hf_token = read_env_value(speaker_env_path, 'HF_TOKEN') if hf_token and hf_token != 'your_huggingface_token_here': cmd.extend(['--hf-token', hf_token]) - - # Pass compute mode from existing config if available - compute_mode = read_env_value(speaker_env_path, 'COMPUTE_MODE') - if compute_mode and compute_mode in ['cpu', 'gpu']: - cmd.extend(['--compute-mode', compute_mode]) # For openmemory-mcp, try to pass OpenAI API key from backend if available if service_name == 'openmemory-mcp': From 334d489bc08407a0e4902e422e391738a7754609 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Fri, 19 Sep 2025 01:34:01 +0000 Subject: [PATCH 2/4] Update Dockerfile command and enhance integration tests; add memory provider state in Memories component - Modified the CMD in the Dockerfile to include an extra argument for deepgram. - Simplified subprocess calls in integration tests by removing stdout and stderr captures. - Added memory provider state management in the Memories component, displaying the provider name conditionally in the UI. --- backends/advanced/Dockerfile | 2 +- backends/advanced/tests/test_integration.py | 17 ++----------- .../advanced/webui/src/pages/Memories.tsx | 25 ++++++++++++++----- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/backends/advanced/Dockerfile b/backends/advanced/Dockerfile index 947c8abe..be3e1019 100644 --- a/backends/advanced/Dockerfile +++ b/backends/advanced/Dockerfile @@ -40,4 +40,4 @@ COPY diarization_config.json* ./ # Run the application -CMD ["uv", "run", "python3", "src/advanced_omi_backend/main.py"] +CMD ["uv", "run", "--extra", "deepgram", "python3", "src/advanced_omi_backend/main.py"] diff --git a/backends/advanced/tests/test_integration.py b/backends/advanced/tests/test_integration.py index db3cfac7..e0b55798 100644 --- a/backends/advanced/tests/test_integration.py +++ b/backends/advanced/tests/test_integration.py @@ -501,14 +501,9 @@ def start_services(self): env = os.environ.copy() env['DOCKER_BUILDKIT'] = '0' logger.info("🔨 Running Docker build command...") - build_result = subprocess.run(["docker", "compose", "-f", "docker-compose-test.yml", "build"], capture_output=True, text=True, env=env) - logger.info(f"📋 Build command stdout: {build_result.stdout}") - if build_result.stderr: - logger.warning(f"📋 Build command stderr: {build_result.stderr}") + build_result = subprocess.run(["docker", "compose", "-f", "docker-compose-test.yml", "build"], env=env) if build_result.returncode != 0: logger.error(f"❌ Build failed with exit code {build_result.returncode}") - logger.error(f"📋 Build stderr: {build_result.stderr}") - logger.error(f"📋 Build stdout: {build_result.stdout}") raise RuntimeError("Docker compose build failed") cmd = ["docker", "compose", "-f", "docker-compose-test.yml", "up", "-d", "--no-build"] else: @@ -524,18 +519,10 @@ def start_services(self): env = os.environ.copy() env['DOCKER_BUILDKIT'] = '0' logger.info(f"🚀 Running Docker compose command: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=300) - - # Always log the command outputs for debugging - logger.info(f"📋 Docker compose command stdout:\n{result.stdout}") - if result.stderr: - logger.warning(f"📋 Docker compose command stderr:\n{result.stderr}") - logger.info(f"📋 Docker compose exit code: {result.returncode}") + result = subprocess.run(cmd, env=env, timeout=300) if result.returncode != 0: logger.error(f"❌ Failed to start services with exit code {result.returncode}") - logger.error(f"❌ Command stderr: {result.stderr}") - logger.error(f"❌ Command stdout: {result.stdout}") # Check individual container logs for better error details logger.error("🔍 Checking individual container logs for details...") diff --git a/backends/advanced/webui/src/pages/Memories.tsx b/backends/advanced/webui/src/pages/Memories.tsx index c99089cc..7ad3bf59 100644 --- a/backends/advanced/webui/src/pages/Memories.tsx +++ b/backends/advanced/webui/src/pages/Memories.tsx @@ -34,6 +34,7 @@ export default function Memories() { // System configuration state const [memoryProviderSupportsThreshold, setMemoryProviderSupportsThreshold] = useState(false) + const [memoryProvider, setMemoryProvider] = useState('') const { user } = useAuth() @@ -41,12 +42,15 @@ export default function Memories() { try { const response = await systemApi.getMetrics() const supports = response.data.memory_provider_supports_threshold || false + const provider = response.data.memory_provider || 'unknown' setMemoryProviderSupportsThreshold(supports) - console.log('🔧 Memory provider supports threshold:', supports) + setMemoryProvider(provider) + console.log('🔧 Memory provider:', provider, 'supports threshold:', supports) } catch (err: any) { console.error('❌ Failed to load system config:', err) // Default to false if we can't determine setMemoryProviderSupportsThreshold(false) + setMemoryProvider('unknown') } } @@ -243,11 +247,20 @@ export default function Memories() { return (
{/* Header */} -
- -

- Memory Management -

+
+
+ +
+

+ Memory Management +

+ {memoryProvider && ( +

+ Provider: {memoryProvider === 'friend_lite' ? 'Friend-Lite' : memoryProvider === 'openmemory_mcp' ? 'OpenMemory MCP' : memoryProvider} +

+ )} +
+
{/* Controls */} From 207c799604e39ea2de8c978ce8a62a053dd385f3 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Fri, 19 Sep 2025 15:19:27 +0000 Subject: [PATCH 3/4] fixes 29 --- .../src/advanced_omi_backend/transcription.py | 107 ++++++++++++++++-- 1 file changed, 98 insertions(+), 9 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/transcription.py b/backends/advanced/src/advanced_omi_backend/transcription.py index 1cd5baa6..f6ac919f 100644 --- a/backends/advanced/src/advanced_omi_backend/transcription.py +++ b/backends/advanced/src/advanced_omi_backend/transcription.py @@ -492,14 +492,20 @@ async def _process_transcript_result(self, transcript_result): await conversations_repo.activate_transcript_version(conversation_id, version_id) logger.info(f"✅ Created and activated initial transcript version {version_id} for conversation {conversation_id}") - # Update conversation with speaker info and metadata (NOT transcript data - that's in versions) + # Generate title and summary with speaker information + title = await self._generate_title_with_speakers(segments_to_store) + summary = await self._generate_summary_with_speakers(segments_to_store) + + # Update conversation with speaker info, title, summary and metadata update_data = { + "title": title, + "summary": summary, "speaker_names": speaker_names, "updated_at": datetime.now(UTC) } await conversations_repo.update_conversation(conversation_id, update_data) - logger.info(f"✅ Updated conversation {conversation_id} with {len(segments_to_store)} transcript segments and {len(speakers_found)} speakers") + logger.info(f"✅ Updated conversation {conversation_id} with {len(segments_to_store)} transcript segments, {len(speakers_found)} speakers, and speaker-aware title/summary") except Exception as e: logger.error(f"Failed to update conversation {conversation_id} with transcript data: {e}") @@ -627,19 +633,15 @@ async def _create_conversation(self, audio_uuid: str, transcript_data: dict, spe logger.error(f"No audio session found for {audio_uuid}") return None - # Generate title and summary from transcript - title = await self._generate_title(transcript_data.get("text", "")) - summary = await self._generate_summary(transcript_data.get("text", "")) - - # Create conversation data + # Create conversation data (title and summary will be generated after speaker recognition) conversation_id = str(uuid.uuid4()) conversation_data = { "conversation_id": conversation_id, "audio_uuid": audio_uuid, "user_id": audio_session["user_id"], "client_id": audio_session["client_id"], - "title": title, - "summary": summary, + "title": "Processing...", # Placeholder - will be updated after speaker recognition + "summary": "Processing...", # Placeholder - will be updated after speaker recognition # Versioned system (source of truth) "transcript_versions": [], @@ -730,6 +732,93 @@ async def _generate_summary(self, text: str) -> str: # Fallback to simple summary generation return text[:120] + "..." if len(text) > 120 else text or "No content" + async def _generate_title_with_speakers(self, segments: list) -> str: + """Generate an LLM-powered title from conversation segments with speaker information.""" + if not segments: + return "Conversation" + + # Format conversation with speaker names + conversation_text = "" + for segment in segments[:10]: # Use first 10 segments for title generation + speaker = segment.get("speaker", "") + text = segment.get("text", "").strip() + if text: + if speaker: + conversation_text += f"{speaker}: {text}\n" + else: + conversation_text += f"{text}\n" + + if not conversation_text.strip(): + return "Conversation" + + try: + prompt = f"""Generate a concise title (max 40 characters) for this conversation: + +"{conversation_text[:500]}" + +Rules: +- Maximum 40 characters +- Include speaker names if relevant +- Capture the main topic +- Be specific and informative + +Title:""" + + title = await async_generate(prompt, temperature=0.3) + title = title.strip().strip('"').strip("'") + return title[:40] + "..." if len(title) > 40 else title or "Conversation" + + except Exception as e: + logger.warning(f"Failed to generate LLM title with speakers: {e}") + # Fallback to simple title generation + words = conversation_text.split()[:6] + title = " ".join(words) + return title[:40] + "..." if len(title) > 40 else title or "Conversation" + + async def _generate_summary_with_speakers(self, segments: list) -> str: + """Generate an LLM-powered summary from conversation segments with speaker information.""" + if not segments: + return "No content" + + # Format conversation with speaker names + conversation_text = "" + speakers_in_conv = set() + for segment in segments: + speaker = segment.get("speaker", "") + text = segment.get("text", "").strip() + if text: + if speaker: + conversation_text += f"{speaker}: {text}\n" + speakers_in_conv.add(speaker) + else: + conversation_text += f"{text}\n" + + if not conversation_text.strip(): + return "No content" + + try: + prompt = f"""Generate a brief, informative summary (1-2 sentences, max 120 characters) for this conversation with speakers: + +"{conversation_text[:1000]}" + +Rules: +- Maximum 120 characters +- 1-2 complete sentences +- Include speaker names when relevant (e.g., "John discusses X with Sarah") +- Capture key topics and outcomes +- Use present tense +- Be specific and informative + +Summary:""" + + summary = await async_generate(prompt, temperature=0.3) + return summary.strip().strip('"').strip("'") or "No content" + + except Exception as e: + logger.warning(f"Failed to generate LLM summary with speakers: {e}") + # Fallback to simple summary generation + return conversation_text[:120] + "..." if len(conversation_text) > 120 else conversation_text or "No content" + async def _queue_memory_processing(self, conversation_id: str): """Queue memory processing for a speech-detected conversation. From fd5f4c3f63b45265d79bc5b69246a90357d3d839 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Fri, 19 Sep 2025 15:23:27 +0000 Subject: [PATCH 4/4] Enhance transcription status update and improve thread safety in memory service - Updated the `update_transcription_status` method in `database.py` to use `Optional` for error message and provider parameters, and refactored the update logic for better clarity. - Implemented double-checked locking in `get_memory_service` to ensure thread-safe singleton creation in `service_factory.py`, preventing potential race conditions. --- .../src/advanced_omi_backend/database.py | 24 +++++---- .../memory/service_factory.py | 52 +++++++++++-------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index f6cd691b..e93c1d5c 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -387,7 +387,7 @@ async def update_memory_processing_status( return result.modified_count > 0 async def update_transcription_status( - self, audio_uuid: str, status: str, error_message: str = None, provider: str = None + self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None ): """Update transcription processing status and completion timestamp. @@ -415,32 +415,36 @@ async def update_transcription_status( if error_message: version_data["error_message"] = error_message + update_doc = { + "active_transcript_version": version_id, + "transcription_status": status, + "transcription_updated_at": datetime.now(UTC).isoformat(), + } + if status == "COMPLETED": + update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() + result = await self.col.update_one( {"audio_uuid": audio_uuid}, { "$push": {"transcript_versions": version_data}, - "$set": { - "active_transcript_version": version_id, - "transcription_status": status, - "transcription_updated_at": datetime.now(UTC).isoformat(), - } + "$set": update_doc } ) else: # Update existing active version update_doc = { - f"transcript_versions.$[version].status": status, - f"transcript_versions.$[version].updated_at": datetime.now(UTC), + "transcript_versions.$[version].status": status, + "transcript_versions.$[version].updated_at": datetime.now(UTC).isoformat(), "transcription_status": status, "transcription_updated_at": datetime.now(UTC).isoformat(), } if status == "COMPLETED": update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() if error_message: - update_doc[f"transcript_versions.$[version].error_message"] = error_message + update_doc["transcript_versions.$[version].error_message"] = error_message update_doc["transcription_error"] = error_message if provider: - update_doc[f"transcript_versions.$[version].provider"] = provider + update_doc["transcript_versions.$[version].provider"] = provider update_doc["transcript_provider"] = provider result = await self.col.update_one( diff --git a/backends/advanced/src/advanced_omi_backend/memory/service_factory.py b/backends/advanced/src/advanced_omi_backend/memory/service_factory.py index c18bdccc..1df6ac27 100644 --- a/backends/advanced/src/advanced_omi_backend/memory/service_factory.py +++ b/backends/advanced/src/advanced_omi_backend/memory/service_factory.py @@ -7,6 +7,7 @@ import asyncio import logging +import threading from typing import Optional from .base import MemoryServiceBase @@ -16,6 +17,8 @@ # Global memory service instance _memory_service: Optional[MemoryServiceBase] = None +# Lock for thread-safe singleton creation +_memory_service_lock = threading.Lock() def create_memory_service(config: MemoryConfig) -> MemoryServiceBase: @@ -56,36 +59,41 @@ def create_memory_service(config: MemoryConfig) -> MemoryServiceBase: def get_memory_service() -> MemoryServiceBase: """Get the global memory service instance. - + This function implements the singleton pattern and will create the memory service on first access based on environment configuration. - + Returns: - Initialized memory service instance - + Memory service instance that is created lazily on first use + and may return an already-initialized singleton + Raises: RuntimeError: If memory service creation or initialization fails """ global _memory_service - + + # Double-checked locking pattern for thread-safe singleton creation if _memory_service is None: - try: - # Build configuration from environment - config = build_memory_config_from_env() - - # Create appropriate service implementation - _memory_service = create_memory_service(config) - - # Don't initialize here - let it happen lazily on first use - # This prevents orphaned tasks that cause "Task was destroyed but it is pending" errors - memory_logger.debug(f"Memory service created but not initialized: {type(_memory_service).__name__}") - - memory_logger.info(f"✅ Global memory service created: {type(_memory_service).__name__}") - - except Exception as e: - memory_logger.error(f"❌ Failed to create memory service: {e}") - raise RuntimeError(f"Memory service creation failed: {e}") - + with _memory_service_lock: + # Re-check after acquiring lock in case another thread created it + if _memory_service is None: + try: + # Build configuration from environment + config = build_memory_config_from_env() + + # Create appropriate service implementation + _memory_service = create_memory_service(config) + + # Don't initialize here - let it happen lazily on first use + # This prevents orphaned tasks that cause "Task was destroyed but it is pending" errors + memory_logger.debug(f"Memory service created but not initialized: {type(_memory_service).__name__}") + + memory_logger.info(f"✅ Global memory service created: {type(_memory_service).__name__}") + + except Exception as e: + memory_logger.error(f"❌ Failed to create memory service: {e}") + raise RuntimeError(f"Memory service creation failed: {e}") + return _memory_service