diff --git a/backends/advanced-backend/.env.template b/backends/advanced-backend/.env.template index 57903a1c..50de9abc 100644 --- a/backends/advanced-backend/.env.template +++ b/backends/advanced-backend/.env.template @@ -1,6 +1,33 @@ + +# Transcription OFFLINE_ASR_TCP_URI= -OLLAMA_BASE_URL= +DEEPGRAM_API_KEY= + +# LLM config +LLM_PROVIDER=openai +LLM_API_KEY=sk-proj- +LLM_CHOICE=gpt-4o-mini + +OLLAMA_BASE_URL=http://ollama:11434 + + + +NGROK_URL= NGROK_AUTHTOKEN= HF_TOKEN= + +METRICS_COLLECTION_ENABLE=false + + +MEM0_TELEMETRY= +NEXT_PUBLIC_API_URL=http://127.0.0.0:8050 +NEXT_PUBLIC_USER_ID=NEXT_PUBLIC_USER_ID + + SPEAKER_SERVICE_URL= -MONGODB_URI= \ No newline at end of file + +MONGODB_URI=mongo +QDRANT_BASE_URL=qdrant +NEO4J_HOST=neo4j-mem0 +NEO4J_USER=neo4j +NEO4J_PASSWORD= \ No newline at end of file diff --git a/backends/advanced-backend/docker-compose.yml b/backends/advanced-backend/docker-compose.yml index 1c5f1e7c..2f1a1ee5 100644 --- a/backends/advanced-backend/docker-compose.yml +++ b/backends/advanced-backend/docker-compose.yml @@ -8,6 +8,8 @@ services: volumes: - ./audio_chunks:/app/audio_chunks - ./debug_dir:/app/debug_dir + env_file: + - .env environment: - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} - OFFLINE_ASR_TCP_URI=${OFFLINE_ASR_TCP_URI} @@ -108,7 +110,7 @@ services: - "4040:4040" # Ngrok web interface environment: - NGROK_AUTHTOKEN=${NGROK_AUTHTOKEN} - command: "http friend-backend:8000 --url=intelligent-hypervisor.ngrok.app" + command: "http friend-backend:8000 --url=${NGROK_URL}" depends_on: - friend-backend volumes: diff --git a/backends/advanced-backend/src/action_items_service.py b/backends/advanced-backend/src/action_items_service.py index 6f9ef342..e8d753b1 100644 --- a/backends/advanced-backend/src/action_items_service.py +++ b/backends/advanced-backend/src/action_items_service.py @@ -5,6 +5,8 @@ from motor.motor_asyncio import AsyncIOMotorCollection import logging import ollama +import openai +import os # Set up logging action_items_logger = logging.getLogger("action_items") @@ -15,9 +17,9 @@ class ActionItemsService: Replaces the Mem0-based implementation for better update capabilities. """ - def __init__(self, collection: AsyncIOMotorCollection, ollama_client: ollama.Client): + def __init__(self, collection: AsyncIOMotorCollection, llm_client: Any): self.collection = collection - self.ollama_client = ollama_client + self.llm_client = llm_client self._initialized = False async def initialize(self): @@ -64,40 +66,34 @@ async def extract_and_store_action_items(self, transcript: str, client_id: str, except Exception as e: action_items_logger.error(f"Error extracting action items for {audio_uuid}: {e}") return 0 - + async def _extract_action_items_from_transcript(self, transcript: str, client_id: str, audio_uuid: str) -> List[Dict[str, Any]]: - """Extract action items from transcript using Ollama.""" + """Extract action items from transcript using llm.""" try: extraction_prompt = f""" -<|begin_of_text|><|start_header_id|>system<|end_header_id|> -You are an intelligent assistant that reads transcripts and extracts all potential action items, even informal or implied ones. - -Your output must be a **JSON array**, where action item includes: -- description: A short summary of the task -- assignee: Who should do it ("unassigned" if unclear) -- due_date: When it should be done ("not_specified" if not mentioned) -- priority: high / medium / low / not_specified -- context: Why or how the task came up -- tool: The name of the tool required, if any ("check_email", "check_calendar", "set_alarm"), or "none" if no tool is needed - -Rules: -- Identify both explicit tasks and implied ones. -- Suggest a tool only when the task obviously requires it or could be automated. -- If it's a general human task with no clear automation, use `"none"` for tool. - -Return **only** a JSON array. No explanation or extra text. - -<|eot_id|> -<|start_header_id|>user<|end_header_id|> -Transcript: - -{transcript} - -<|eot_id|> -<|start_header_id|>assistant<|end_header_id|> -""" - response = self.ollama_client.generate( - model="llama3.1:latest", + Analyze the following conversation transcript and extract action items. + + Look for: + - Tasks that someone commits to do ("I'll send the report", "I will call them") + - Requests made to others ("Can you review this", "Please schedule a meeting") + - Things that need to be done ("We need to fix the bug", "The document needs updating") + - Follow-up actions mentioned ("Let's schedule a follow-up", "We should contact them") + + For each action item found, provide: + - description: Clear description of what needs to be done + - assignee: Who should do it (use names from transcript, or "unassigned" if not clear) + - due_date: When it should be done (extract from transcript, or "not_specified") + - priority: Assess urgency from context (high/medium/low/not_specified) + - context: Brief context about when/why it was mentioned + + Return ONLY a JSON array of action items. If no action items found, return an empty array []. + + Transcript: + {transcript} + """ + + response = self.llm_client.generate( + model=os.getenv('LLM_CHOICE', 'llama-3.1:latest'), prompt=extraction_prompt, options={"temperature": 0.1} ) @@ -125,14 +121,13 @@ async def _extract_action_items_from_transcript(self, transcript: str, client_id "updated_at": int(time.time()), "source": "transcript_extraction" }) - # TODO: Handle all tools here, these can be imported from other files # Handle set_alarm tool, this can be another llm call to mcp with description as input # Also handle sending notification via app or TTS if item.get("tool") == "set_alarm": description = item.get("description", "") action_items_logger.info(f"Calling set alarm service with description: {description}") - + action_items_logger.info(f"Extracted {len(action_items)} action items from {audio_uuid}") return action_items @@ -456,4 +451,4 @@ async def get_action_item_stats(self, user_id: str) -> Dict[str, Any]: # <|eot_id|> # <|start_header_id|>assistant<|end_header_id|> # """ -# pyperclip.copy(extraction_prompt) \ No newline at end of file +# pyperclip.copy(extraction_prompt) diff --git a/backends/advanced-backend/src/conversation_manager.py b/backends/advanced-backend/src/conversation_manager.py new file mode 100644 index 00000000..afedf718 --- /dev/null +++ b/backends/advanced-backend/src/conversation_manager.py @@ -0,0 +1,481 @@ +import asyncio +import logging +import time +import uuid +import re +from functools import partial +from pathlib import Path +from typing import Optional, Tuple, Any + +from easy_audio_interfaces.filesystem.filesystem_interfaces import LocalFileSink +from wyoming.audio import AudioChunk + +# Import services and utilities that ConversationManager depends on +from memory import get_memory_service, shutdown_memory_service +from metrics import get_metrics_collector +from action_items_service import ActionItemsService +from routers import audio_chunks_router # For AudioChunkUtils + +# Import the ClientState model +from models.client_state import ClientState + +# Global instances (will be managed by ConversationManager) +active_clients: dict[str, ClientState] = {} + +# Logging setup +audio_logger = logging.getLogger("audio_processing") +logger = logging.getLogger("conversation_manager") + +async def create_client_state(client_id: str, audio_chunk_utils, config: dict, transcription_manager_class: Any) -> ClientState: + """Create and register a new client state.""" + metrics_collector = get_metrics_collector() # Get metrics collector here + client_state = ClientState(client_id, audio_chunk_utils, metrics_collector, active_clients, config, transcription_manager_class) + active_clients[client_id] = client_state + await start_processing(client_state) # Call the new standalone function + + # Track client connection in metrics + metrics_collector.record_client_connection(client_id) + + return client_state + +async def cleanup_client_state(client_id: str): + """Clean up and remove client state.""" + if client_id in active_clients: + client_state = active_clients[client_id] + await disconnect(client_state) # Call the new standalone function + del active_clients[client_id] + + # Track client disconnection in metrics + get_metrics_collector().record_client_disconnection(client_id) + +# --- Conversation-related functions (moved from ClientState) --- + +def record_speech_start(client_state: ClientState, audio_uuid: str, timestamp: float): + """Record the start of a speech segment.""" + client_state.current_speech_start[audio_uuid] = timestamp + audio_logger.info(f"Recorded speech start for {audio_uuid}: {timestamp}") + +def record_speech_end(client_state: ClientState, audio_uuid: str, timestamp: float): + """Record the end of a speech segment.""" + if ( + audio_uuid in client_state.current_speech_start + and client_state.current_speech_start[audio_uuid] is not None + ): + start_time = client_state.current_speech_start[audio_uuid] + if start_time is not None: # Type guard + if audio_uuid not in client_state.speech_segments: + client_state.speech_segments[audio_uuid] = [] + client_state.speech_segments[audio_uuid].append((start_time, timestamp)) + client_state.current_speech_start[audio_uuid] = None + duration = timestamp - start_time + audio_logger.info( + f"Recorded speech segment for {audio_uuid}: {start_time:.3f} -> {timestamp:.3f} (duration: {duration:.3f}s)" + ) + else: + audio_logger.warning( + f"Speech end recorded for {audio_uuid} but no start time found" + ) + +async def start_processing(client_state: ClientState): + """Start the processing tasks for this client.""" + client_state.saver_task = asyncio.create_task(_audio_saver(client_state)) + client_state.transcription_task = asyncio.create_task(_transcription_processor(client_state)) + client_state.memory_task = asyncio.create_task(_memory_processor(client_state)) + client_state.action_item_task = asyncio.create_task(_action_item_processor(client_state)) + audio_logger.info(f"Started processing tasks for client {client_state.client_id}") + +async def disconnect(client_state: ClientState): + """Clean disconnect of client state.""" + if not client_state.connected: + return + + client_state.connected = False + audio_logger.info(f"Disconnecting client {client_state.client_id}") + + # Close current conversation with all processing before signaling shutdown + await _close_current_conversation(client_state) + + # Signal processors to stop + await client_state.chunk_queue.put(None) + await client_state.transcription_queue.put((None, None)) + await client_state.memory_queue.put((None, None, None)) + await client_state.action_item_queue.put((None, None, None)) + + # Wait for tasks to complete + if client_state.saver_task: + await client_state.saver_task + if client_state.transcription_task: + await client_state.transcription_task + if client_state.memory_task: + await client_state.memory_task + if client_state.action_item_task: + await client_state.action_item_task + + # Clean up transcription manager + if client_state.transcription_manager: + await client_state.transcription_manager.disconnect() + client_state.transcription_manager = None + + # Clean up any remaining speech segment tracking + client_state.speech_segments.clear() + client_state.current_speech_start.clear() + client_state.conversation_transcripts.clear() # Clear conversation transcripts + + audio_logger.info(f"Client {client_state.client_id} disconnected and cleaned up") + +def _should_start_new_conversation(client_state: ClientState) -> bool: + """Check if we should start a new conversation based on timeout.""" + if client_state.last_transcript_time is None: + return False # No transcript yet, keep current conversation + + current_time = time.time() + time_since_last_transcript = current_time - client_state.last_transcript_time + timeout_seconds = client_state.NEW_CONVERSATION_TIMEOUT_MINUTES * 60 + + return time_since_last_transcript > timeout_seconds + +async def _close_current_conversation(client_state: ClientState): + """Close the current conversation with proper cleanup including audio cropping and speaker processing.""" + if client_state.file_sink: + # Store current audio info before closing + current_uuid = client_state.current_audio_uuid + current_path = client_state.file_sink.file_path + + audio_logger.info( + f"πŸ”’ Closing conversation {current_uuid}, file: {current_path}" + ) + + # Process memory at end of conversation if we have transcripts + if client_state.conversation_transcripts and current_uuid: + full_conversation = " ".join(client_state.conversation_transcripts) + audio_logger.info( + f"πŸ’­ Processing memory for conversation {current_uuid} with {len(client_state.conversation_transcripts)} transcript segments" + ) + audio_logger.info( + f"πŸ’­ Individual transcripts: {client_state.conversation_transcripts}" + ) + audio_logger.info( + f"πŸ’­ Full conversation text: {full_conversation[:200]}..." + ) # Log first 200 chars + + start_time = time.time() + memories_created = [] + action_items_created = [] + processing_success = True + error_message = None + + try: + # Track memory storage request + client_state.metrics_collector.record_memory_storage_request() + + # Add general memory + memory_result = client_state.memory_service.add_memory( + full_conversation, client_state.client_id, current_uuid + ) + if memory_result: + audio_logger.info( + f"βœ… Successfully added conversation memory for {current_uuid}" + ) + client_state.metrics_collector.record_memory_storage_result(True) + + # Use the actual memory objects returned from mem0's add() method + memory_results = memory_result.get("results", []) + memories_created = [] + + for mem in memory_results: + memory_text = mem.get("memory", "Memory text unavailable") + memory_id = mem.get("id", "unknown") + event = mem.get("event", "UNKNOWN") + memories_created.append( + {"id": memory_id, "text": memory_text, "event": event} + ) + + audio_logger.info( + f"Created {len(memories_created)} memory objects: {[m['event'] for m in memories_created]}" + ) + else: + audio_logger.error( + f"❌ Failed to add conversation memory for {current_uuid}" + ) + client_state.metrics_collector.record_memory_storage_result(False) + processing_success = False + error_message = "Failed to add general memory" + + except Exception as e: + audio_logger.error( + f"❌ Error processing memory and action items for {current_uuid}: {e}" + ) + processing_success = False + error_message = str(e) + + # Log debug information + processing_time_ms = (time.time() - start_time) * 1000 + # memory_debug.log_memory_processing( + # user_id=client_state.client_id, + # audio_uuid=current_uuid, + # transcript_text=full_conversation, + # memories_created=memories_created, + # action_items_created=action_items_created, + # processing_success=processing_success, + # error_message=error_message, + # processing_time_ms=processing_time_ms, + # ) + else: + audio_logger.info( + f"ℹ️ No transcripts to process for memory in conversation {current_uuid}" + ) + # Log empty processing for debug + if current_uuid: + pass + # memory_debug.log_memory_processing( + # user_id=client_state.client_id, + # audio_uuid=current_uuid, + # transcript_text="", + # memories_created=[], + # action_items_created=[], + # processing_success=True, + # error_message="No transcripts available for processing", + # processing_time_ms=0, + # ) + + await client_state.file_sink.close() + + # Track successful audio chunk save in metrics + try: + file_path = Path(current_path) + if file_path.exists(): + # Estimate duration (60 seconds per chunk is TARGET_SAMPLES) + duration_seconds = client_state.OMI_SAMPLE_RATE * client_state.SEGMENT_SECONDS / client_state.OMI_SAMPLE_RATE # Corrected + # Calculate voice activity if we have speech segments + # if current_uuid and current_uuid in client_state.speech_segments: + # for start, end in client_state.speech_segments[current_uuid]: + # voice_activity_seconds += end - start + + client_state.metrics_collector.record_audio_chunk_saved( + duration_seconds, voice_activity_seconds + ) + audio_logger.debug( + f"πŸ“Š Recorded audio chunk metrics: {duration_seconds}s total, {voice_activity_seconds}s voice activity" + ) + else: + client_state.metrics_collector.record_audio_chunk_failed() + audio_logger.warning( + f"πŸ“Š Audio file not found after save: {current_path}" + ) + except Exception as e: + audio_logger.error(f"πŸ“Š Error recording audio metrics: {e}") + + client_state.file_sink = None + + # Process audio cropping if we have speech segments + if current_uuid and current_path: + if current_uuid in client_state.speech_segments: + speech_segments = client_state.speech_segments[current_uuid] + audio_logger.info( + f"🎯 Found {len(speech_segments)} speech segments for {current_uuid}: {speech_segments}" + ) + if speech_segments: # Only crop if we have speech segments + cropped_path = str(current_path).replace(".wav", "_cropped.wav") + + # Process in background - won't block + asyncio.create_task( + client_state.audio_chunk_utils.reprocess_audio_cropping( + audio_uuid=current_uuid + ) + ) + audio_logger.info( + f"βœ‚οΈ Queued audio cropping for {current_path} with {len(speech_segments)} speech segments" + ) + else: + audio_logger.info( + f"⚠️ Empty speech segments list found for {current_path}, skipping cropping" + ) + + # Clean up segments for this conversation + del client_state.speech_segments[current_uuid] + if current_uuid in client_state.current_speech_start: + del client_state.current_speech_start[current_uuid] + else: + audio_logger.info( + f"⚠️ No speech segments found for {current_path} (uuid: {current_uuid}), skipping cropping" + ) + + else: + audio_logger.info( + f"πŸ”’ No active file sink to close for client {client_state.client_id}" + ) + +async def start_new_conversation(client_state: ClientState): + """Start a new conversation by closing current conversation and resetting state.""" + await _close_current_conversation(client_state) + + # Reset conversation state + client_state.current_audio_uuid = None + client_state.conversation_start_time = time.time() + client_state.last_transcript_time = None + client_state.conversation_transcripts.clear() # Clear collected transcripts for new conversation + + audio_logger.info( + f"Client {client_state.client_id}: Started new conversation due to {client_state.NEW_CONVERSATION_TIMEOUT_MINUTES}min timeout" + ) + + +async def _audio_saver(client_state: ClientState): + """Per-client audio saver consumer.""" + try: + while client_state.connected: + audio_chunk = await client_state.chunk_queue.get() + + if audio_chunk is None: # Disconnect signal + break + + # Check if we should start a new conversation due to timeout + if _should_start_new_conversation(client_state): + await start_new_conversation(client_state) + + if client_state.file_sink is None: + # Create new file sink for this client + client_state.current_audio_uuid = uuid.uuid4().hex + timestamp = audio_chunk.timestamp or int(time.time()) + wav_filename = ( + f"{timestamp}_{client_state.client_id}_{client_state.current_audio_uuid}.wav" + ) + audio_logger.info( + f"Creating file sink with: rate={int(client_state.OMI_SAMPLE_RATE)}, channels={int(client_state.OMI_CHANNELS)}, width={int(client_state.OMI_SAMPLE_WIDTH)}" + ) + client_state.file_sink = LocalFileSink(f"{client_state.CHUNK_DIR}/{wav_filename}", client_state.OMI_SAMPLE_RATE, client_state.OMI_CHANNELS, client_state.OMI_SAMPLE_WIDTH) + await client_state.file_sink.open() + + await client_state.audio_chunk_utils.chunk_repo.create_chunk( + audio_uuid=client_state.current_audio_uuid, + audio_path=wav_filename, + client_id=client_state.client_id, + timestamp=timestamp, + ) + + await client_state.file_sink.write(audio_chunk) + + # Queue for transcription + await client_state.transcription_queue.put( + (client_state.current_audio_uuid, audio_chunk) + ) + + except Exception as e: + audio_logger.error( + f"Error in audio saver for client {client_state.client_id}: {e}", exc_info=True + ) + finally: + # Close current conversation with all processing when audio saver ends + await _close_current_conversation(client_state) + +async def _transcription_processor(client_state: ClientState): + """Per-client transcription processor.""" + try: + while client_state.connected: + audio_uuid, chunk = await client_state.transcription_queue.get() + + if audio_uuid is None or chunk is None: # Disconnect signal + break + + # Get or create transcription manager + if client_state.transcription_manager is None: + # Create callback function to queue action items + async def action_item_callback(transcript_text, client_id, audio_uuid): + await client_state.action_item_queue.put((transcript_text, client_id, audio_uuid)) + + client_state.transcription_manager = client_state.transcription_manager_class( + action_item_callback=action_item_callback, + audio_chunk_utils=client_state.audio_chunk_utils, + metrics_collector=client_state.metrics_collector, + active_clients=client_state.active_clients + ) + try: + await client_state.transcription_manager.connect() + except Exception as e: + audio_logger.error( + f"Failed to create transcription manager for client {client_state.client_id}: {e}" + ) + continue + + # Process transcription + try: + await client_state.transcription_manager.transcribe_chunk( + audio_uuid, chunk, client_state.client_id + ) + except Exception as e: + audio_logger.error( + f"Error transcribing for client {client_state.client_id}: {e}" + ) + # Recreate transcription manager on error + if client_state.transcription_manager: + await client_state.transcription_manager.disconnect() + client_state.transcription_manager = None + + except Exception as e: + audio_logger.error( + f"Error in transcription processor for client {client_state.client_id}: {e}", + exc_info=True, + ) + +async def _memory_processor(client_state: ClientState): + """Per-client memory processor - currently unused as memory processing happens at conversation end.""" + try: + while client_state.connected: + transcript, client_id, audio_uuid = await client_state.memory_queue.get() + + if ( + transcript is None or client_id is None or audio_uuid is None + ): # Disconnect signal + break + + # Memory processing now happens at conversation end, so this is effectively a no-op + # Keeping the processor running to avoid breaking the queue system + audio_logger.debug( + f"Memory processor received item but processing is now done at conversation end" + ) + + except Exception as e: + audio_logger.error( + f"Error in memory processor for client {client_state.client_id}: {e}", + exc_info=True, + ) + +async def _action_item_processor(client_state: ClientState): + """ + Processes transcript segments from the per-client action item queue. + + For each transcript segment, this processor: + - Checks if the special keyphrase 'Simon says' (case-insensitive, as a phrase) appears in the text. + - If found, it replaces all occurrences of the keyphrase with 'Simon says' (canonical form) and extracts action items from the modified text. + - Logs the detection and extraction process for this special case. + - If the keyphrase is not found, it extracts action items from the original transcript text. + - All extraction is performed using the action_items_service. + - Logs the number of action items extracted or any errors encountered. + """ + try: + while client_state.connected: + transcript_text, client_id, audio_uuid = await client_state.action_item_queue.get() + + if transcript_text is None or client_id is None or audio_uuid is None: # Disconnect signal + break + + # Check for the special keyphrase 'simon says' (case-insensitive, any spaces or dots) + keyphrase_pattern = re.compile(r'\bSimon says\b', re.IGNORECASE) + if keyphrase_pattern.search(transcript_text): + # Remove all occurrences of the keyphrase + modified_text = keyphrase_pattern.sub('Simon says', transcript_text) + audio_logger.info(f"πŸ”‘ 'simon says' keyphrase detected in transcript for {audio_uuid}. Extracting action items from: '{modified_text.strip()}'") + try: + action_item_count = await client_state.action_items_service.extract_and_store_action_items( + modified_text.strip(), client_id, audio_uuid + ) + if action_item_count > 0: + audio_logger.info(f"🎯 Extracted {action_item_count} action items from 'simon says' transcript segment for {audio_uuid}") + else: + audio_logger.debug(f"ℹ️ No action items found in 'simon says' transcript segment for {audio_uuid}") + except Exception as e: + audio_logger.error(f"❌ Error processing 'simon says' action items for transcript segment in {audio_uuid}: {e}") + continue # Skip the normal extraction for this case + + except Exception as e: + audio_logger.error(f"Error in action item processor for client {client_state.client_id}: {e}", exc_info=True) \ No newline at end of file diff --git a/backends/advanced-backend/src/database.py b/backends/advanced-backend/src/database.py new file mode 100644 index 00000000..d5eea7bc --- /dev/null +++ b/backends/advanced-backend/src/database.py @@ -0,0 +1,8 @@ +import os +from motor.motor_asyncio import AsyncIOMotorClient + +MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") +mongo_client = AsyncIOMotorClient(MONGODB_URI) + +def get_db_client(): + return mongo_client \ No newline at end of file diff --git a/backends/advanced-backend/src/main.py b/backends/advanced-backend/src/main.py index 87c3a016..c1e7f178 100644 --- a/backends/advanced-backend/src/main.py +++ b/backends/advanced-backend/src/main.py @@ -19,29 +19,38 @@ from functools import partial from pathlib import Path from typing import Optional, Tuple -import re + +from utils.logging import audio_logger + +from routers import audio_chunks_router +from routers import user_router +from routers import memory_router +from routers import action_items_router +from utils.transcribe import TranscriptionManager +from wyoming.audio import AudioChunk import ollama +import openai from dotenv import load_dotenv from easy_audio_interfaces.filesystem.filesystem_interfaces import LocalFileSink -from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect +from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect, Depends from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from motor.motor_asyncio import AsyncIOMotorClient from omi.decoder import OmiOpusDecoder -from wyoming.asr import Transcribe, Transcript -from wyoming.audio import AudioChunk, AudioStart from wyoming.client import AsyncTcpClient -from wyoming.vad import VoiceStarted, VoiceStopped # from debug_utils import memory_debug -from memory import get_memory_service, init_memory_config, shutdown_memory_service +from memory import get_memory_service, shutdown_memory_service +# init_memory_config, from metrics import ( get_metrics_collector, start_metrics_collection, stop_metrics_collection, ) from action_items_service import ActionItemsService +from memory import get_memory_client +from conversation_manager import active_clients, create_client_state, cleanup_client_state ############################################################################### # SETUP @@ -52,22 +61,12 @@ # Mem0 telemetry configuration is now handled in the memory module -# Logging setup -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("advanced-backend") -audio_logger = logging.getLogger("audio_processing") - -# Conditional Deepgram import -try: - from deepgram import DeepgramClient, FileSource, PrerecordedOptions # type: ignore - DEEPGRAM_AVAILABLE = True - logger.info("βœ… Deepgram SDK available") -except ImportError: - DEEPGRAM_AVAILABLE = False - logger.warning("Deepgram SDK not available. Install with: pip install deepgram-sdk") -audio_cropper_logger = logging.getLogger("audio_cropper") +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +RESET = "\033[0m" ############################################################################### # CONFIGURATION @@ -77,7 +76,6 @@ MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") mongo_client = AsyncIOMotorClient(MONGODB_URI) db = mongo_client.get_default_database("friend-lite") -chunks_col = db["audio_chunks"] users_col = db["users"] speakers_col = db["speakers"] # New collection for speaker management action_items_col = db["action_items"] # New collection for action items @@ -107,38 +105,17 @@ CHUNK_DIR = Path("./audio_chunks") CHUNK_DIR.mkdir(parents=True, exist_ok=True) -# ASR Configuration -OFFLINE_ASR_TCP_URI = os.getenv("OFFLINE_ASR_TCP_URI", "tcp://192.168.0.110:8765/") -DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") - -# Determine transcription strategy based on environment variables -USE_DEEPGRAM = bool(DEEPGRAM_API_KEY and DEEPGRAM_AVAILABLE) -if DEEPGRAM_API_KEY and not DEEPGRAM_AVAILABLE: - audio_logger.error( - "DEEPGRAM_API_KEY provided but Deepgram SDK not available. Falling back to offline ASR." - ) -audio_logger.info( - f"Transcription strategy: {'Deepgram' if USE_DEEPGRAM else 'Offline ASR'}" -) - -# Deepgram client placeholder (not implemented) -deepgram_client = None -if USE_DEEPGRAM: - audio_logger.warning( - "Deepgram transcription requested but not yet implemented. Falling back to offline ASR." - ) - USE_DEEPGRAM = False - # Ollama & Qdrant Configuration OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") +LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1") QDRANT_BASE_URL = os.getenv("QDRANT_BASE_URL", "qdrant") # Memory configuration is now handled in the memory module # Initialize it with our Ollama and Qdrant URLs -init_memory_config( - ollama_base_url=OLLAMA_BASE_URL, - qdrant_base_url=QDRANT_BASE_URL, -) +# init_memory_config( +# ollama_base_url=OLLAMA_BASE_URL, +# qdrant_base_url=QDRANT_BASE_URL, +# ) # Speaker service configuration @@ -150,1030 +127,15 @@ # Initialize memory service, speaker service, and ollama client memory_service = get_memory_service() -ollama_client = ollama.Client(host=OLLAMA_BASE_URL) - -action_items_service = ActionItemsService(action_items_col, ollama_client) - -############################################################################### -# AUDIO PROCESSING FUNCTIONS -############################################################################### - - -async def _process_audio_cropping_with_relative_timestamps( - original_path: str, - speech_segments: list[tuple[float, float]], - output_path: str, - audio_uuid: str, -) -> bool: - """ - Process audio cropping with automatic relative timestamp conversion. - This function handles both live processing and reprocessing scenarios. - """ - try: - # Convert absolute timestamps to relative timestamps - # Extract file start time from filename: timestamp_client_uuid.wav - filename = original_path.split("/")[-1] - file_start_timestamp = float(filename.split("_")[0]) - - # Convert speech segments to relative timestamps - relative_segments = [] - for start_abs, end_abs in speech_segments: - start_rel = start_abs - file_start_timestamp - end_rel = end_abs - file_start_timestamp - - # Ensure relative timestamps are positive (sanity check) - if start_rel < 0: - audio_logger.warning( - f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0" - ) - start_rel = 0.0 - if end_rel < 0: - audio_logger.warning( - f"⚠️ Negative end timestamp: {end_rel}, skipping segment" - ) - continue +if os.getenv("LLM_PROVIDER") == "ollama": + llm_client = ollama.Client(host=OLLAMA_BASE_URL) +elif os.getenv("LLM_PROVIDER") == "openai": + llm_client = openai.Client(api_key=os.getenv("LLM_API_KEY")) - relative_segments.append((start_rel, end_rel)) +action_items_service = ActionItemsService(action_items_col, llm_client) - audio_logger.info( - f"πŸ• Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}" - ) - audio_logger.info(f"πŸ• Absolute segments: {speech_segments}") - audio_logger.info(f"πŸ• Relative segments: {relative_segments}") - - success = await _crop_audio_with_ffmpeg( - original_path, relative_segments, output_path - ) - if success: - # Update database with cropped file info (keep original absolute timestamps for reference) - cropped_filename = output_path.split("/")[-1] - await chunk_repo.update_cropped_audio( - audio_uuid, cropped_filename, speech_segments - ) - audio_logger.info( - f"Successfully processed cropped audio: {cropped_filename}" - ) - return True - else: - audio_logger.error(f"Failed to crop audio for {audio_uuid}") - return False - except Exception as e: - audio_logger.error(f"Error in audio cropping task for {audio_uuid}: {e}") - return False -async def _crop_audio_with_ffmpeg( - original_path: str, speech_segments: list[tuple[float, float]], output_path: str -) -> bool: - """Use ffmpeg to crop audio - runs as async subprocess, no GIL issues""" - audio_cropper_logger.info( - f"Cropping audio {original_path} with {len(speech_segments)} speech segments" - ) - - if not AUDIO_CROPPING_ENABLED: - audio_cropper_logger.info(f"Audio cropping disabled, skipping {original_path}") - return False - - if not speech_segments: - audio_cropper_logger.warning(f"No speech segments to crop for {original_path}") - return False - - # Filter out segments that are too short - filtered_segments = [] - for start, end in speech_segments: - duration = end - start - if duration >= MIN_SPEECH_SEGMENT_DURATION: - # Add padding around speech segments - padded_start = max(0, start - CROPPING_CONTEXT_PADDING) - padded_end = end + CROPPING_CONTEXT_PADDING - filtered_segments.append((padded_start, padded_end)) - else: - audio_cropper_logger.debug( - f"Skipping short segment: {start}-{end} ({duration:.2f}s < {MIN_SPEECH_SEGMENT_DURATION}s)" - ) - - if not filtered_segments: - audio_cropper_logger.warning( - f"No segments meet minimum duration ({MIN_SPEECH_SEGMENT_DURATION}s) for {original_path}" - ) - return False - - audio_cropper_logger.info( - f"Cropping audio {original_path} with {len(filtered_segments)} speech segments (filtered from {len(speech_segments)})" - ) - - try: - # Build ffmpeg filter for concatenating speech segments - filter_parts = [] - for i, (start, end) in enumerate(filtered_segments): - duration = end - start - filter_parts.append( - f"[0:a]atrim=start={start}:duration={duration},asetpts=PTS-STARTPTS[seg{i}]" - ) - - # Concatenate all segments - inputs = "".join(f"[seg{i}]" for i in range(len(filtered_segments))) - concat_filter = f"{inputs}concat=n={len(filtered_segments)}:v=0:a=1[out]" - - full_filter = ";".join(filter_parts + [concat_filter]) - - # Run ffmpeg as async subprocess - cmd = [ - "ffmpeg", - "-y", # -y = overwrite output - "-i", - original_path, - "-filter_complex", - full_filter, - "-map", - "[out]", - "-c:a", - "pcm_s16le", # Keep same format as original - output_path, - ] - - audio_cropper_logger.info(f"Running ffmpeg command: {' '.join(cmd)}") - - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - - stdout, stderr = await process.communicate() - if stdout: - audio_cropper_logger.debug(f"FFMPEG stdout: {stdout.decode()}") - - if process.returncode == 0: - # Calculate cropped duration - cropped_duration = sum(end - start for start, end in filtered_segments) - audio_cropper_logger.info( - f"Successfully cropped {original_path} -> {output_path} ({cropped_duration:.1f}s from {len(filtered_segments)} segments)" - ) - return True - else: - error_msg = stderr.decode() if stderr else "Unknown ffmpeg error" - audio_logger.error(f"ffmpeg failed for {original_path}: {error_msg}") - return False - - except Exception as e: - audio_logger.error(f"Error running ffmpeg on {original_path}: {e}") - return False - - -############################################################################### -# UTILITY FUNCTIONS & HELPER CLASSES -############################################################################### - - -def _new_local_file_sink(file_path): - """Create a properly configured LocalFileSink with all wave parameters set.""" - sink = LocalFileSink( - file_path=file_path, - sample_rate=int(OMI_SAMPLE_RATE), - channels=int(OMI_CHANNELS), - sample_width=int(OMI_SAMPLE_WIDTH), - ) - return sink - - -class ChunkRepo: - """Async helpers for the audio_chunks collection.""" - - def __init__(self, collection): - self.col = collection - - async def create_chunk( - self, - *, - audio_uuid, - audio_path, - client_id, - timestamp, - transcript=None, - speakers_identified=None, - ): - doc = { - "audio_uuid": audio_uuid, - "audio_path": audio_path, - "client_id": client_id, - "timestamp": timestamp, - "transcript": transcript or [], # List of conversation segments - "speakers_identified": speakers_identified - or [], # List of identified speakers - } - await self.col.insert_one(doc) - - async def add_transcript_segment(self, audio_uuid, transcript_segment): - """Add a single transcript segment to the conversation.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$push": {"transcript": transcript_segment}} - ) - - async def add_speaker(self, audio_uuid, speaker_id): - """Add a speaker to the speakers_identified list if not already present.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$addToSet": {"speakers_identified": speaker_id}}, - ) - - async def update_transcript(self, audio_uuid, full_transcript): - """Update the entire transcript list (for compatibility).""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, {"$set": {"transcript": full_transcript}} - ) - - async def update_segment_timing( - self, audio_uuid, segment_index, start_time, end_time - ): - """Update timing information for a specific transcript segment.""" - await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - f"transcript.{segment_index}.start": start_time, - f"transcript.{segment_index}.end": end_time, - } - }, - ) - - async def update_segment_speaker(self, audio_uuid, segment_index, speaker_id): - """Update the speaker for a specific transcript segment.""" - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - {"$set": {f"transcript.{segment_index}.speaker": speaker_id}}, - ) - if result.modified_count > 0: - audio_logger.info( - f"Updated segment {segment_index} speaker to {speaker_id} for {audio_uuid}" - ) - return result.modified_count > 0 - - async def update_cropped_audio( - self, - audio_uuid: str, - cropped_path: str, - speech_segments: list[tuple[float, float]], - ): - """Update the chunk with cropped audio information.""" - cropped_duration = sum(end - start for start, end in speech_segments) - - result = await self.col.update_one( - {"audio_uuid": audio_uuid}, - { - "$set": { - "cropped_audio_path": cropped_path, - "speech_segments": [ - {"start": start, "end": end} for start, end in speech_segments - ], - "cropped_duration": cropped_duration, - "cropped_at": time.time(), - } - }, - ) - if result.modified_count > 0: - audio_logger.info( - f"Updated cropped audio info for {audio_uuid}: {cropped_path}" - ) - return result.modified_count > 0 - - -class TranscriptionManager: - """Manages transcription using either Deepgram or offline ASR service.""" - - def __init__(self, action_item_callback=None): - self.client = None - self._current_audio_uuid = None - self._streaming = False - self.use_deepgram = USE_DEEPGRAM - self.deepgram_client = deepgram_client - self._audio_buffer = [] # Buffer for Deepgram batch processing - self.action_item_callback = action_item_callback # Callback to queue action items - - async def connect(self): - """Establish connection to ASR service (only for offline ASR).""" - if self.use_deepgram: - audio_logger.info("Using Deepgram transcription - no connection needed") - return - - try: - self.client = AsyncTcpClient.from_uri(OFFLINE_ASR_TCP_URI) - await self.client.connect() - audio_logger.info( - f"Connected to offline ASR service at {OFFLINE_ASR_TCP_URI}" - ) - except Exception as e: - audio_logger.error(f"Failed to connect to offline ASR service: {e}") - self.client = None - raise - - async def disconnect(self): - """Cleanly disconnect from ASR service.""" - if self.use_deepgram: - audio_logger.info("Using Deepgram - no disconnection needed") - return - - if self.client: - try: - await self.client.disconnect() - audio_logger.info("Disconnected from offline ASR service") - except Exception as e: - audio_logger.error(f"Error disconnecting from offline ASR service: {e}") - finally: - self.client = None - - async def transcribe_chunk( - self, audio_uuid: str, chunk: AudioChunk, client_id: str - ): - """Transcribe a single chunk using either Deepgram or offline ASR.""" - if self.use_deepgram: - await self._transcribe_chunk_deepgram(audio_uuid, chunk, client_id) - else: - await self._transcribe_chunk_offline(audio_uuid, chunk, client_id) - - async def _transcribe_chunk_deepgram( - self, audio_uuid: str, chunk: AudioChunk, client_id: str - ): - """Transcribe using Deepgram API.""" - raise NotImplementedError( - "Deepgram transcription is not yet implemented. Please use offline ASR by not setting DEEPGRAM_API_KEY." - ) - - async def _process_deepgram_buffer(self, audio_uuid: str, client_id: str): - """Process buffered audio with Deepgram.""" - raise NotImplementedError("Deepgram transcription is not yet implemented.") - - async def _transcribe_chunk_offline( - self, audio_uuid: str, chunk: AudioChunk, client_id: str - ): - """Transcribe using offline ASR service.""" - if not self.client: - audio_logger.error(f"No ASR connection available for {audio_uuid}") - # Track transcription failure - metrics_collector = get_metrics_collector() - metrics_collector.record_transcription_result(False) - return - - # Track transcription request - start_time = time.time() - metrics_collector = get_metrics_collector() - metrics_collector.record_transcription_request() - - try: - if self._current_audio_uuid != audio_uuid: - self._current_audio_uuid = audio_uuid - audio_logger.info(f"New audio_uuid: {audio_uuid}") - transcribe = Transcribe() - await self.client.write_event(transcribe.event()) - audio_start = AudioStart( - rate=chunk.rate, - width=chunk.width, - channels=chunk.channels, - timestamp=chunk.timestamp, - ) - await self.client.write_event(audio_start.event()) - - # Send the audio chunk - await self.client.write_event(chunk.event()) - - # Read and process any available events (non-blocking) - try: - while True: - event = await asyncio.wait_for( - self.client.read_event(), timeout=0.001 - ) # this is a quick poll, feels like a better solution can exist - if event is None: - break - - if Transcript.is_type(event.type): - transcript_obj = Transcript.from_event(event) - transcript_text = transcript_obj.text.strip() - - # Handle both Transcript and StreamingTranscript types - # Check the 'final' attribute from the event data, not the reconstructed object - is_final = event.data.get( - "final", True - ) # Default to True for standard Transcript - - # Only process final transcripts, ignore partial ones - if not is_final: - audio_logger.info( - f"Ignoring partial transcript for {audio_uuid}: {transcript_text}" - ) - continue - - if transcript_text: - audio_logger.info( - f"Transcript for {audio_uuid}: {transcript_text} (final: {is_final})" - ) - - # Track successful transcription with latency - latency_ms = (time.time() - start_time) * 1000 - metrics_collector.record_transcription_result( - True, latency_ms - ) - - # Create transcript segment with new format - transcript_segment = { - "speaker": f"speaker_{client_id}", - "text": transcript_text, - "start": 0.0, - "end": 0.0, - } - - # Store transcript segment in DB immediately - - await chunk_repo.add_transcript_segment(audio_uuid, transcript_segment) - - # Queue for action item processing using callback (async, non-blocking) - if self.action_item_callback: - await self.action_item_callback(transcript_text, client_id, audio_uuid) - - await chunk_repo.add_speaker(audio_uuid, f"speaker_{client_id}") - audio_logger.info(f"Added transcript segment for {audio_uuid} to DB.") - - # Update transcript time for conversation timeout tracking - if client_id in active_clients: - active_clients[client_id].last_transcript_time = ( - time.time() - ) - # Collect transcript for end-of-conversation memory processing - active_clients[ - client_id - ].conversation_transcripts.append(transcript_text) - audio_logger.info( - f"Added transcript to conversation collection: '{transcript_text}'" - ) - - elif VoiceStarted.is_type(event.type): - audio_logger.info( - f"VoiceStarted event received for {audio_uuid}" - ) - current_time = time.time() - if client_id in active_clients: - active_clients[client_id].record_speech_start( - audio_uuid, current_time - ) - audio_logger.info( - f"🎀 Voice started for {audio_uuid} at {current_time}" - ) - - elif VoiceStopped.is_type(event.type): - audio_logger.info( - f"VoiceStopped event received for {audio_uuid}" - ) - current_time = time.time() - if client_id in active_clients: - active_clients[client_id].record_speech_end( - audio_uuid, current_time - ) - audio_logger.info( - f"πŸ”‡ Voice stopped for {audio_uuid} at {current_time}" - ) - - except asyncio.TimeoutError: - # No events available right now, that's fine - pass - - except Exception as e: - audio_logger.error( - f"Error in offline transcribe_chunk for {audio_uuid}: {e}" - ) - # Track transcription failure - metrics_collector.record_transcription_result(False) - # Attempt to reconnect on error - await self._reconnect() - - async def _reconnect(self): - """Attempt to reconnect to ASR service.""" - audio_logger.info("Attempting to reconnect to ASR service...") - - # Track reconnection attempt - metrics_collector = get_metrics_collector() - metrics_collector.record_service_reconnection("asr-service") - - await self.disconnect() - await asyncio.sleep(2) # Brief delay before reconnecting - try: - await self.connect() - except Exception as e: - audio_logger.error(f"Reconnection failed: {e}") - - -class ClientState: - """Manages all state for a single client connection.""" - - def __init__(self, client_id: str): - self.client_id = client_id - self.connected = True - - # Per-client queues - self.chunk_queue = asyncio.Queue[Optional[AudioChunk]]() - self.transcription_queue = asyncio.Queue[Tuple[Optional[str], Optional[AudioChunk]]]() - self.memory_queue = asyncio.Queue[Tuple[Optional[str], Optional[str], Optional[str]]]() # (transcript, client_id, audio_uuid) - self.action_item_queue = asyncio.Queue[Tuple[Optional[str], Optional[str], Optional[str]]]() # (transcript_text, client_id, audio_uuid) - - # Per-client file sink - self.file_sink: Optional[LocalFileSink] = None - self.current_audio_uuid: Optional[str] = None - - # Per-client transcription manager - self.transcription_manager: Optional[TranscriptionManager] = None - - # Conversation timeout tracking - self.last_transcript_time: Optional[float] = None - self.conversation_start_time: float = time.time() - - # Speech segment tracking for audio cropping - self.speech_segments: dict[str, list[tuple[float, float]]] = ( - {} - ) # audio_uuid -> [(start, end), ...] - self.current_speech_start: dict[str, Optional[float]] = ( - {} - ) # audio_uuid -> start_time - - # Conversation transcript collection for end-of-conversation memory processing - self.conversation_transcripts: list[str] = ( - [] - ) # Collect all transcripts for this conversation - - # Tasks for this client - self.saver_task: Optional[asyncio.Task] = None - self.transcription_task: Optional[asyncio.Task] = None - self.memory_task: Optional[asyncio.Task] = None - self.action_item_task: Optional[asyncio.Task] = None - - def record_speech_start(self, audio_uuid: str, timestamp: float): - """Record the start of a speech segment.""" - self.current_speech_start[audio_uuid] = timestamp - audio_logger.info(f"Recorded speech start for {audio_uuid}: {timestamp}") - - def record_speech_end(self, audio_uuid: str, timestamp: float): - """Record the end of a speech segment.""" - if ( - audio_uuid in self.current_speech_start - and self.current_speech_start[audio_uuid] is not None - ): - start_time = self.current_speech_start[audio_uuid] - if start_time is not None: # Type guard - if audio_uuid not in self.speech_segments: - self.speech_segments[audio_uuid] = [] - self.speech_segments[audio_uuid].append((start_time, timestamp)) - self.current_speech_start[audio_uuid] = None - duration = timestamp - start_time - audio_logger.info( - f"Recorded speech segment for {audio_uuid}: {start_time:.3f} -> {timestamp:.3f} (duration: {duration:.3f}s)" - ) - else: - audio_logger.warning( - f"Speech end recorded for {audio_uuid} but no start time found" - ) - - async def start_processing(self): - """Start the processing tasks for this client.""" - self.saver_task = asyncio.create_task(self._audio_saver()) - self.transcription_task = asyncio.create_task(self._transcription_processor()) - self.memory_task = asyncio.create_task(self._memory_processor()) - self.action_item_task = asyncio.create_task(self._action_item_processor()) - audio_logger.info(f"Started processing tasks for client {self.client_id}") - - async def disconnect(self): - """Clean disconnect of client state.""" - if not self.connected: - return - - self.connected = False - audio_logger.info(f"Disconnecting client {self.client_id}") - - # Close current conversation with all processing before signaling shutdown - await self._close_current_conversation() - - # Signal processors to stop - await self.chunk_queue.put(None) - await self.transcription_queue.put((None, None)) - await self.memory_queue.put((None, None, None)) - await self.action_item_queue.put((None, None, None)) - - # Wait for tasks to complete - if self.saver_task: - await self.saver_task - if self.transcription_task: - await self.transcription_task - if self.memory_task: - await self.memory_task - if self.action_item_task: - await self.action_item_task - - # Clean up transcription manager - if self.transcription_manager: - await self.transcription_manager.disconnect() - self.transcription_manager = None - - # Clean up any remaining speech segment tracking - self.speech_segments.clear() - self.current_speech_start.clear() - self.conversation_transcripts.clear() # Clear conversation transcripts - - audio_logger.info(f"Client {self.client_id} disconnected and cleaned up") - - def _should_start_new_conversation(self) -> bool: - """Check if we should start a new conversation based on timeout.""" - if self.last_transcript_time is None: - return False # No transcript yet, keep current conversation - - current_time = time.time() - time_since_last_transcript = current_time - self.last_transcript_time - timeout_seconds = NEW_CONVERSATION_TIMEOUT_MINUTES * 60 - - return time_since_last_transcript > timeout_seconds - - async def _close_current_conversation(self): - """Close the current conversation with proper cleanup including audio cropping and speaker processing.""" - if self.file_sink: - # Store current audio info before closing - current_uuid = self.current_audio_uuid - current_path = self.file_sink.file_path - - audio_logger.info( - f"πŸ”’ Closing conversation {current_uuid}, file: {current_path}" - ) - - # Process memory at end of conversation if we have transcripts - if self.conversation_transcripts and current_uuid: - full_conversation = " ".join(self.conversation_transcripts) - audio_logger.info( - f"πŸ’­ Processing memory for conversation {current_uuid} with {len(self.conversation_transcripts)} transcript segments" - ) - audio_logger.info( - f"πŸ’­ Individual transcripts: {self.conversation_transcripts}" - ) - audio_logger.info( - f"πŸ’­ Full conversation text: {full_conversation[:200]}..." - ) # Log first 200 chars - - start_time = time.time() - memories_created = [] - action_items_created = [] - processing_success = True - error_message = None - - try: - # Track memory storage request - metrics_collector = get_metrics_collector() - metrics_collector.record_memory_storage_request() - - # Add general memory - memory_result = memory_service.add_memory( - full_conversation, self.client_id, current_uuid - ) - if memory_result: - audio_logger.info( - f"βœ… Successfully added conversation memory for {current_uuid}" - ) - metrics_collector.record_memory_storage_result(True) - - # Use the actual memory objects returned from mem0's add() method - memory_results = memory_result.get("results", []) - memories_created = [] - - for mem in memory_results: - memory_text = mem.get("memory", "Memory text unavailable") - memory_id = mem.get("id", "unknown") - event = mem.get("event", "UNKNOWN") - memories_created.append( - {"id": memory_id, "text": memory_text, "event": event} - ) - - audio_logger.info( - f"Created {len(memories_created)} memory objects: {[m['event'] for m in memories_created]}" - ) - else: - audio_logger.error( - f"❌ Failed to add conversation memory for {current_uuid}" - ) - metrics_collector.record_memory_storage_result(False) - processing_success = False - error_message = "Failed to add general memory" - - except Exception as e: - audio_logger.error( - f"❌ Error processing memory and action items for {current_uuid}: {e}" - ) - processing_success = False - error_message = str(e) - - # Log debug information - processing_time_ms = (time.time() - start_time) * 1000 - # memory_debug.log_memory_processing( - # user_id=self.client_id, - # audio_uuid=current_uuid, - # transcript_text=full_conversation, - # memories_created=memories_created, - # action_items_created=action_items_created, - # processing_success=processing_success, - # error_message=error_message, - # processing_time_ms=processing_time_ms, - # ) - else: - audio_logger.info( - f"ℹ️ No transcripts to process for memory in conversation {current_uuid}" - ) - # Log empty processing for debug - if current_uuid: - pass - # memory_debug.log_memory_processing( - # user_id=self.client_id, - # audio_uuid=current_uuid, - # transcript_text="", - # memories_created=[], - # action_items_created=[], - # processing_success=True, - # error_message="No transcripts available for processing", - # processing_time_ms=0, - # ) - - await self.file_sink.close() - - # Track successful audio chunk save in metrics - try: - metrics_collector = get_metrics_collector() - file_path = Path(current_path) - if file_path.exists(): - # Estimate duration (60 seconds per chunk is TARGET_SAMPLES) - duration_seconds = SEGMENT_SECONDS - - # Calculate voice activity if we have speech segments - voice_activity_seconds = 0 - if current_uuid and current_uuid in self.speech_segments: - for start, end in self.speech_segments[current_uuid]: - voice_activity_seconds += end - start - - metrics_collector.record_audio_chunk_saved( - duration_seconds, voice_activity_seconds - ) - audio_logger.debug( - f"πŸ“Š Recorded audio chunk metrics: {duration_seconds}s total, {voice_activity_seconds}s voice activity" - ) - else: - metrics_collector.record_audio_chunk_failed() - audio_logger.warning( - f"πŸ“Š Audio file not found after save: {current_path}" - ) - except Exception as e: - audio_logger.error(f"πŸ“Š Error recording audio metrics: {e}") - - self.file_sink = None - - # Process audio cropping if we have speech segments - if current_uuid and current_path: - if current_uuid in self.speech_segments: - speech_segments = self.speech_segments[current_uuid] - audio_logger.info( - f"🎯 Found {len(speech_segments)} speech segments for {current_uuid}: {speech_segments}" - ) - if speech_segments: # Only crop if we have speech segments - cropped_path = str(current_path).replace(".wav", "_cropped.wav") - - # Process in background - won't block - asyncio.create_task( - self._process_audio_cropping( - f"{CHUNK_DIR}/{current_path}", - speech_segments, - f"{CHUNK_DIR}/{cropped_path}", - current_uuid, - ) - ) - audio_logger.info( - f"βœ‚οΈ Queued audio cropping for {current_path} with {len(speech_segments)} speech segments" - ) - else: - audio_logger.info( - f"⚠️ Empty speech segments list found for {current_path}, skipping cropping" - ) - - # Clean up segments for this conversation - del self.speech_segments[current_uuid] - if current_uuid in self.current_speech_start: - del self.current_speech_start[current_uuid] - else: - audio_logger.info( - f"⚠️ No speech segments found for {current_path} (uuid: {current_uuid}), skipping cropping" - ) - - else: - audio_logger.info( - f"πŸ”’ No active file sink to close for client {self.client_id}" - ) - - async def start_new_conversation(self): - """Start a new conversation by closing current conversation and resetting state.""" - await self._close_current_conversation() - - # Reset conversation state - self.current_audio_uuid = None - self.conversation_start_time = time.time() - self.last_transcript_time = None - self.conversation_transcripts.clear() # Clear collected transcripts for new conversation - - audio_logger.info( - f"Client {self.client_id}: Started new conversation due to {NEW_CONVERSATION_TIMEOUT_MINUTES}min timeout" - ) - - async def _process_audio_cropping( - self, - original_path: str, - speech_segments: list[tuple[float, float]], - output_path: str, - audio_uuid: str, - ): - """Background task for audio cropping using ffmpeg.""" - await _process_audio_cropping_with_relative_timestamps( - original_path, speech_segments, output_path, audio_uuid - ) - - async def _audio_saver(self): - """Per-client audio saver consumer.""" - try: - while self.connected: - audio_chunk = await self.chunk_queue.get() - - if audio_chunk is None: # Disconnect signal - break - - # Check if we should start a new conversation due to timeout - if self._should_start_new_conversation(): - await self.start_new_conversation() - - if self.file_sink is None: - # Create new file sink for this client - self.current_audio_uuid = uuid.uuid4().hex - timestamp = audio_chunk.timestamp or int(time.time()) - wav_filename = ( - f"{timestamp}_{self.client_id}_{self.current_audio_uuid}.wav" - ) - audio_logger.info( - f"Creating file sink with: rate={int(OMI_SAMPLE_RATE)}, channels={int(OMI_CHANNELS)}, width={int(OMI_SAMPLE_WIDTH)}" - ) - self.file_sink = _new_local_file_sink(f"{CHUNK_DIR}/{wav_filename}") - await self.file_sink.open() - - await chunk_repo.create_chunk( - audio_uuid=self.current_audio_uuid, - audio_path=wav_filename, - client_id=self.client_id, - timestamp=timestamp, - ) - - await self.file_sink.write(audio_chunk) - - # Queue for transcription - await self.transcription_queue.put( - (self.current_audio_uuid, audio_chunk) - ) - - except Exception as e: - audio_logger.error( - f"Error in audio saver for client {self.client_id}: {e}", exc_info=True - ) - finally: - # Close current conversation with all processing when audio saver ends - await self._close_current_conversation() - - async def _transcription_processor(self): - """Per-client transcription processor.""" - try: - while self.connected: - audio_uuid, chunk = await self.transcription_queue.get() - - if audio_uuid is None or chunk is None: # Disconnect signal - break - - # Get or create transcription manager - if self.transcription_manager is None: - # Create callback function to queue action items - async def action_item_callback(transcript_text, client_id, audio_uuid): - await self.action_item_queue.put((transcript_text, client_id, audio_uuid)) - - self.transcription_manager = TranscriptionManager(action_item_callback=action_item_callback) - try: - await self.transcription_manager.connect() - except Exception as e: - audio_logger.error( - f"Failed to create transcription manager for client {self.client_id}: {e}" - ) - continue - - # Process transcription - try: - await self.transcription_manager.transcribe_chunk( - audio_uuid, chunk, self.client_id - ) - except Exception as e: - audio_logger.error( - f"Error transcribing for client {self.client_id}: {e}" - ) - # Recreate transcription manager on error - if self.transcription_manager: - await self.transcription_manager.disconnect() - self.transcription_manager = None - - except Exception as e: - audio_logger.error( - f"Error in transcription processor for client {self.client_id}: {e}", - exc_info=True, - ) - - async def _memory_processor(self): - """Per-client memory processor - currently unused as memory processing happens at conversation end.""" - try: - while self.connected: - transcript, client_id, audio_uuid = await self.memory_queue.get() - - if ( - transcript is None or client_id is None or audio_uuid is None - ): # Disconnect signal - break - - # Memory processing now happens at conversation end, so this is effectively a no-op - # Keeping the processor running to avoid breaking the queue system - audio_logger.debug( - f"Memory processor received item but processing is now done at conversation end" - ) - - except Exception as e: - audio_logger.error( - f"Error in memory processor for client {self.client_id}: {e}", - exc_info=True, - ) - - async def _action_item_processor(self): - """ - Processes transcript segments from the per-client action item queue. - - For each transcript segment, this processor: - - Checks if the special keyphrase 'Simon says' (case-insensitive, as a phrase) appears in the text. - - If found, it replaces all occurrences of the keyphrase with 'Simon says' (canonical form) and extracts action items from the modified text. - - Logs the detection and extraction process for this special case. - - If the keyphrase is not found, it extracts action items from the original transcript text. - - All extraction is performed using the action_items_service. - - Logs the number of action items extracted or any errors encountered. - """ - try: - while self.connected: - transcript_text, client_id, audio_uuid = await self.action_item_queue.get() - - if transcript_text is None or client_id is None or audio_uuid is None: # Disconnect signal - break - - # Check for the special keyphrase 'simon says' (case-insensitive, any spaces or dots) - keyphrase_pattern = re.compile(r'\bSimon says\b', re.IGNORECASE) - if keyphrase_pattern.search(transcript_text): - # Remove all occurrences of the keyphrase - modified_text = keyphrase_pattern.sub('Simon says', transcript_text) - audio_logger.info(f"πŸ”‘ 'simon says' keyphrase detected in transcript for {audio_uuid}. Extracting action items from: '{modified_text.strip()}'") - try: - action_item_count = await action_items_service.extract_and_store_action_items( - modified_text.strip(), client_id, audio_uuid - ) - if action_item_count > 0: - audio_logger.info(f"🎯 Extracted {action_item_count} action items from 'simon says' transcript segment for {audio_uuid}") - else: - audio_logger.debug(f"ℹ️ No action items found in 'simon says' transcript segment for {audio_uuid}") - except Exception as e: - audio_logger.error(f"❌ Error processing 'simon says' action items for transcript segment in {audio_uuid}: {e}") - continue # Skip the normal extraction for this case - - except Exception as e: - audio_logger.error(f"Error in action item processor for client {self.client_id}: {e}", exc_info=True) - - -# Initialize repository and global state -chunk_repo = ChunkRepo(chunks_col) -active_clients: dict[str, ClientState] = {} - - -async def create_client_state(client_id: str) -> ClientState: - """Create and register a new client state.""" - client_state = ClientState(client_id) - active_clients[client_id] = client_state - await client_state.start_processing() - - # Track client connection in metrics - metrics_collector = get_metrics_collector() - metrics_collector.record_client_connection(client_id) - - return client_state - - -async def cleanup_client_state(client_id: str): - """Clean up and remove client state.""" - if client_id in active_clients: - client_state = active_clients[client_id] - await client_state.disconnect() - del active_clients[client_id] - - # Track client disconnection in metrics - metrics_collector = get_metrics_collector() - metrics_collector.record_client_disconnection(client_id) - ############################################################################### # CORE APPLICATION LOGIC @@ -1187,8 +149,10 @@ async def lifespan(app: FastAPI): audio_logger.info("Starting application...") # Start metrics collection - await start_metrics_collection() - audio_logger.info("Metrics collection started") + + if os.getenv("METRICS_COLLECTION_ENABLE"): + await start_metrics_collection(CHUNK_DIR) + audio_logger.info("Metrics collection started") audio_logger.info( "Application ready - clients will have individual processing pipelines." @@ -1205,7 +169,7 @@ async def lifespan(app: FastAPI): await cleanup_client_state(client_id) # Stop metrics collection and save final report - await stop_metrics_collection() + await stop_metrics_collection(CHUNK_DIR) audio_logger.info("Metrics collection stopped") # Shutdown memory service and speaker service @@ -1218,10 +182,14 @@ async def lifespan(app: FastAPI): # FastAPI Application app = FastAPI(lifespan=lifespan) app.mount("/audio", StaticFiles(directory=CHUNK_DIR), name="audio") +app.include_router(audio_chunks_router.router) +app.include_router(user_router.router) +app.include_router(memory_router.router) +app.include_router(action_items_router.router) @app.websocket("/ws") -async def ws_endpoint(ws: WebSocket, user_id: Optional[str] = Query(None)): +async def ws_endpoint(ws: WebSocket, user_id: Optional[str] = Query(None), audio_chunk_utils: audio_chunks_router.AudioChunkUtils = Depends(audio_chunks_router.get_audio_chunk_utils)): """Accepts WebSocket connections, decodes Opus audio, and processes per-client.""" await ws.accept() @@ -1233,7 +201,18 @@ async def ws_endpoint(ws: WebSocket, user_id: Optional[str] = Query(None)): _decode_packet = partial(decoder.decode_packet, strip_header=False) # Create client state and start processing - client_state = await create_client_state(client_id) + client_state = await create_client_state(client_id, audio_chunk_utils, { + "CHUNK_DIR": CHUNK_DIR, + "OMI_SAMPLE_RATE": OMI_SAMPLE_RATE, + "OMI_CHANNELS": OMI_CHANNELS, + "OMI_SAMPLE_WIDTH": OMI_SAMPLE_WIDTH, + "NEW_CONVERSATION_TIMEOUT_MINUTES": NEW_CONVERSATION_TIMEOUT_MINUTES, + "AUDIO_CROPPING_ENABLED": AUDIO_CROPPING_ENABLED, + "MIN_SPEECH_SEGMENT_DURATION": MIN_SPEECH_SEGMENT_DURATION, + "CROPPING_CONTEXT_PADDING": CROPPING_CONTEXT_PADDING, + "_DEC_IO_EXECUTOR": _DEC_IO_EXECUTOR, + "action_items_service": action_items_service, + }, TranscriptionManager) try: packet_count = 0 @@ -1266,9 +245,8 @@ async def ws_endpoint(ws: WebSocket, user_id: Optional[str] = Query(None)): audio_logger.info(f"πŸ“Š Processed {packet_count} packets ({total_bytes} bytes total) for client {client_id}") # Track audio chunk received in metrics - metrics_collector = get_metrics_collector() - metrics_collector.record_audio_chunk_received(client_id) - metrics_collector.record_client_activity(client_id) + get_metrics_collector().record_audio_chunk_received(client_id) + get_metrics_collector().record_client_activity(client_id) except WebSocketDisconnect: audio_logger.info(f"πŸ”Œ WebSocket disconnected - Client: {client_id}, Packets: {packet_count}, Total bytes: {total_bytes}") @@ -1280,7 +258,7 @@ async def ws_endpoint(ws: WebSocket, user_id: Optional[str] = Query(None)): @app.websocket("/ws_pcm") -async def ws_endpoint_pcm(ws: WebSocket, user_id: Optional[str] = Query(None)): +async def ws_endpoint_pcm(ws: WebSocket, user_id: Optional[str] = Query(None), audio_chunk_utils: audio_chunks_router.AudioChunkUtils = Depends(audio_chunks_router.get_audio_chunk_utils)): """Accepts WebSocket connections, processes PCM audio per-client.""" await ws.accept() @@ -1289,7 +267,18 @@ async def ws_endpoint_pcm(ws: WebSocket, user_id: Optional[str] = Query(None)): audio_logger.info(f"πŸ”Œ PCM WebSocket connection accepted - Client: {client_id}, User ID: {user_id}") # Create client state and start processing - client_state = await create_client_state(client_id) + client_state = await create_client_state(client_id, audio_chunk_utils, { + "CHUNK_DIR": CHUNK_DIR, + "OMI_SAMPLE_RATE": OMI_SAMPLE_RATE, + "OMI_CHANNELS": OMI_CHANNELS, + "OMI_SAMPLE_WIDTH": OMI_SAMPLE_WIDTH, + "NEW_CONVERSATION_TIMEOUT_MINUTES": NEW_CONVERSATION_TIMEOUT_MINUTES, + "AUDIO_CROPPING_ENABLED": AUDIO_CROPPING_ENABLED, + "MIN_SPEECH_SEGMENT_DURATION": MIN_SPEECH_SEGMENT_DURATION, + "CROPPING_CONTEXT_PADDING": CROPPING_CONTEXT_PADDING, + "_DEC_IO_EXECUTOR": _DEC_IO_EXECUTOR, + "action_items_service": action_items_service, + }, TranscriptionManager) try: packet_count = 0 @@ -1316,9 +305,8 @@ async def ws_endpoint_pcm(ws: WebSocket, user_id: Optional[str] = Query(None)): # Track audio chunk received in metrics - metrics_collector = get_metrics_collector() - metrics_collector.record_audio_chunk_received(client_id) - metrics_collector.record_client_activity(client_id) + get_metrics_collector().record_audio_chunk_received(client_id) + get_metrics_collector().record_client_activity(client_id) except WebSocketDisconnect: audio_logger.info(f"πŸ”Œ PCM WebSocket disconnected - Client: {client_id}, Packets: {packet_count}, Total bytes: {total_bytes}") except Exception as e: @@ -1328,310 +316,13 @@ async def ws_endpoint_pcm(ws: WebSocket, user_id: Optional[str] = Query(None)): await cleanup_client_state(client_id) -@app.get("/api/conversations") -async def get_conversations(): - """Get all conversations grouped by client_id.""" - try: - # Get all audio chunks and group by client_id - cursor = chunks_col.find({}).sort("timestamp", -1) - conversations = {} - - async for chunk in cursor: - client_id = chunk.get("client_id", "unknown") - if client_id not in conversations: - conversations[client_id] = [] - - conversations[client_id].append( - { - "audio_uuid": chunk["audio_uuid"], - "audio_path": chunk["audio_path"], - "cropped_audio_path": chunk.get("cropped_audio_path"), - "timestamp": chunk["timestamp"], - "transcript": chunk.get("transcript", []), - "speakers_identified": chunk.get("speakers_identified", []), - "speech_segments": chunk.get("speech_segments", []), - "cropped_duration": chunk.get("cropped_duration"), - } - ) - return {"conversations": conversations} - except Exception as e: - audio_logger.error(f"Error getting conversations: {e}") - return JSONResponse(status_code=500, content={"error": str(e)}) -@app.get("/api/conversations/{audio_uuid}/cropped") -async def get_cropped_audio_info(audio_uuid: str): - """Get cropped audio information for a specific conversation.""" - try: - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: - return JSONResponse( - status_code=404, content={"error": "Conversation not found"} - ) - - return { - "audio_uuid": audio_uuid, - "original_audio_path": chunk["audio_path"], - "cropped_audio_path": chunk.get("cropped_audio_path"), - "speech_segments": chunk.get("speech_segments", []), - "cropped_duration": chunk.get("cropped_duration"), - "cropped_at": chunk.get("cropped_at"), - "has_cropped_version": bool(chunk.get("cropped_audio_path")), - } - except Exception as e: - audio_logger.error(f"Error getting cropped audio info: {e}") - return JSONResponse(status_code=500, content={"error": str(e)}) - - -@app.post("/api/conversations/{audio_uuid}/reprocess") -async def reprocess_audio_cropping(audio_uuid: str): - """Trigger reprocessing of audio cropping for a specific conversation.""" - try: - chunk = await chunks_col.find_one({"audio_uuid": audio_uuid}) - if not chunk: - return JSONResponse( - status_code=404, content={"error": "Conversation not found"} - ) - - original_path = f"{CHUNK_DIR}/{chunk['audio_path']}" - if not Path(original_path).exists(): - return JSONResponse( - status_code=404, content={"error": "Original audio file not found"} - ) - - # Check if we have speech segments - speech_segments = chunk.get("speech_segments", []) - if not speech_segments: - return JSONResponse( - status_code=400, - content={"error": "No speech segments available for cropping"}, - ) - - # Convert speech segments from dict format to tuple format - speech_segments_tuples = [(seg["start"], seg["end"]) for seg in speech_segments] - - cropped_filename = chunk["audio_path"].replace(".wav", "_cropped.wav") - cropped_path = f"{CHUNK_DIR}/{cropped_filename}" - - # Process in background using shared logic - async def reprocess_task(): - audio_logger.info(f"πŸ”„ Starting reprocess for {audio_uuid}") - await _process_audio_cropping_with_relative_timestamps( - original_path, speech_segments_tuples, cropped_path, audio_uuid - ) - - asyncio.create_task(reprocess_task()) - - return {"message": "Reprocessing started", "audio_uuid": audio_uuid} - except Exception as e: - audio_logger.error(f"Error reprocessing audio: {e}") - return JSONResponse(status_code=500, content={"error": str(e)}) - - -@app.get("/api/users") -async def get_users(): - """Retrieves all users from the database.""" - try: - cursor = users_col.find() - users = [] - for doc in await cursor.to_list(length=100): - doc["_id"] = str(doc["_id"]) # Convert ObjectId to string - users.append(doc) - return JSONResponse(content=users) - except Exception as e: - audio_logger.error(f"Error fetching users: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": "Error fetching users"} - ) - - -@app.post("/api/create_user") -async def create_user(user_id: str): - """Creates a new user in the database.""" - try: - # Check if user already exists - existing_user = await users_col.find_one({"user_id": user_id}) - if existing_user: - return JSONResponse( - status_code=409, content={"message": f"User {user_id} already exists"} - ) - - # Create new user - result = await users_col.insert_one({"user_id": user_id}) - return JSONResponse( - status_code=201, - content={ - "message": f"User {user_id} created successfully", - "id": str(result.inserted_id), - }, - ) - except Exception as e: - audio_logger.error(f"Error creating user: {e}", exc_info=True) - return JSONResponse(status_code=500, content={"message": "Error creating user"}) - - -@app.delete("/api/delete_user") -async def delete_user( - user_id: str, delete_conversations: bool = False, delete_memories: bool = False -): - """Deletes a user from the database with optional data cleanup.""" - try: - # Check if user exists - existing_user = await users_col.find_one({"user_id": user_id}) - if not existing_user: - return JSONResponse( - status_code=404, content={"message": f"User {user_id} not found"} - ) - - deleted_data = {} - - # Delete user from users collection - user_result = await users_col.delete_one({"user_id": user_id}) - deleted_data["user_deleted"] = user_result.deleted_count > 0 - - if delete_conversations: - # Delete all conversations (audio chunks) for this user - conversations_result = await chunks_col.delete_many({"client_id": user_id}) - deleted_data["conversations_deleted"] = conversations_result.deleted_count - - if delete_memories: - # Delete all memories for this user using the memory service - try: - memory_count = memory_service.delete_all_user_memories(user_id) - deleted_data["memories_deleted"] = memory_count - except Exception as mem_error: - audio_logger.error( - f"Error deleting memories for user {user_id}: {mem_error}" - ) - deleted_data["memories_deleted"] = 0 - deleted_data["memory_deletion_error"] = str(mem_error) - - # Build message based on what was deleted - message = f"User {user_id} deleted successfully" - deleted_items = [] - if delete_conversations and deleted_data.get("conversations_deleted", 0) > 0: - deleted_items.append( - f"{deleted_data['conversations_deleted']} conversations" - ) - if delete_memories and deleted_data.get("memories_deleted", 0) > 0: - deleted_items.append(f"{deleted_data['memories_deleted']} memories") - - if deleted_items: - message += f" along with {' and '.join(deleted_items)}" - - return JSONResponse( - status_code=200, content={"message": message, "deleted_data": deleted_data} - ) - except Exception as e: - audio_logger.error(f"Error deleting user: {e}", exc_info=True) - return JSONResponse(status_code=500, content={"message": "Error deleting user"}) - - -@app.get("/api/memories") -async def get_memories(user_id: str, limit: int = 100): - """Retrieves memories from the mem0 store with optional filtering.""" - try: - all_memories = memory_service.get_all_memories(user_id=user_id, limit=limit) - return JSONResponse(content=all_memories) - except Exception as e: - audio_logger.error(f"Error fetching memories: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": "Error fetching memories"} - ) - - -@app.get("/api/memories/search") -async def search_memories(user_id: str, query: str, limit: int = 10): - """Search memories using semantic similarity for better retrieval.""" - try: - relevant_memories = memory_service.search_memories( - query=query, user_id=user_id, limit=limit - ) - return JSONResponse(content=relevant_memories) - except Exception as e: - audio_logger.error(f"Error searching memories: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": "Error searching memories"} - ) - - -@app.delete("/api/memories/{memory_id}") -async def delete_memory(memory_id: str): - """Delete a specific memory by ID.""" - try: - memory_service.delete_memory(memory_id=memory_id) - return JSONResponse( - content={"message": f"Memory {memory_id} deleted successfully"} - ) - except Exception as e: - audio_logger.error(f"Error deleting memory {memory_id}: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": "Error deleting memory"} - ) - - -@app.post("/api/conversations/{audio_uuid}/speakers") -async def add_speaker_to_conversation(audio_uuid: str, speaker_id: str): - """Add a speaker to the speakers_identified list for a conversation.""" - try: - await chunk_repo.add_speaker(audio_uuid, speaker_id) - return JSONResponse( - content={ - "message": f"Speaker {speaker_id} added to conversation {audio_uuid}" - } - ) - except Exception as e: - audio_logger.error(f"Error adding speaker: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": "Error adding speaker"} - ) -@app.put("/api/conversations/{audio_uuid}/transcript/{segment_index}") -async def update_transcript_segment( - audio_uuid: str, - segment_index: int, - speaker_id: Optional[str] = None, - start_time: Optional[float] = None, - end_time: Optional[float] = None, -): - """Update a specific transcript segment with speaker or timing information.""" - try: - update_doc = {} - if speaker_id is not None: - update_doc[f"transcript.{segment_index}.speaker"] = speaker_id - # Also add to speakers_identified if not already present - await chunk_repo.add_speaker(audio_uuid, speaker_id) - if start_time is not None: - update_doc[f"transcript.{segment_index}.start"] = start_time - - if end_time is not None: - update_doc[f"transcript.{segment_index}.end"] = end_time - - if not update_doc: - return JSONResponse( - status_code=400, content={"error": "No update parameters provided"} - ) - - result = await chunks_col.update_one( - {"audio_uuid": audio_uuid}, {"$set": update_doc} - ) - - if result.matched_count == 0: - return JSONResponse( - status_code=404, content={"error": "Conversation not found"} - ) - - return JSONResponse( - content={"message": "Transcript segment updated successfully"} - ) - - except Exception as e: - audio_logger.error(f"Error updating transcript segment: {e}") - return JSONResponse(status_code=500, content={"error": "Internal server error"}) # class SpeakerEnrollmentRequest(BaseModel): @@ -1669,9 +360,9 @@ async def health_check(): "services": {}, "config": { "mongodb_uri": MONGODB_URI, - "ollama_url": OLLAMA_BASE_URL, - "qdrant_url": f"http://{QDRANT_BASE_URL}:6333", - "asr_uri": OFFLINE_ASR_TCP_URI, + "llm_url": OLLAMA_BASE_URL if os.getenv("LLM_PROVIDER") == "ollama" else LLM_BASE_URL, + "qdrant_url": f"http://QDRANT_BASE_URL:6333", + "asr_uri": os.getenv("OFFLINE_ASR_TCP_URI", "tcp://192.168.0.110:8765/"), "chunk_dir": str(CHUNK_DIR), "active_clients": len(active_clients), "new_conversation_timeout_minutes": NEW_CONVERSATION_TIMEOUT_MINUTES, @@ -1708,30 +399,73 @@ async def health_check(): overall_healthy = False critical_services_healthy = False - # Check Ollama (non-critical service - may not be running) - try: - # Run in executor to avoid blocking the main thread - loop = asyncio.get_running_loop() - models = await asyncio.wait_for( - loop.run_in_executor(None, ollama_client.list), timeout=8.0 - ) - model_count = len(models.get("models", [])) - health_status["services"]["ollama"] = { - "status": "βœ… Connected", - "healthy": True, - "models": model_count, - "critical": False, - } - except asyncio.TimeoutError: - health_status["services"]["ollama"] = { - "status": "⚠️ Connection Timeout (8s) - Service may not be running", - "healthy": False, - "critical": False, - } - overall_healthy = False - except Exception as e: - health_status["services"]["ollama"] = { - "status": f"⚠️ Connection Failed: {str(e)} - Service may not be running", + # Check LLM Service (Ollama or OpenAI) + llm_provider = os.getenv("LLM_PROVIDER") + health_status["services"]["llm"] = { + "status": "Unknown", + "healthy": False, + "critical": False, + } + + if llm_provider == "ollama": + try: + loop = asyncio.get_running_loop() + models = await asyncio.wait_for( + loop.run_in_executor(None, llm_client.list), timeout=8.0 + ) + model_count = len(models.get("models", [])) + health_status["services"]["llm"] = { + "status": "βœ… Connected (Ollama)", + "healthy": True, + "models": model_count, + "critical": False, + } + except asyncio.TimeoutError: + health_status["services"]["llm"] = { + "status": "⚠️ Connection Timeout (8s) - Ollama service may not be running", + "healthy": False, + "critical": False, + } + overall_healthy = False + except Exception as e: + health_status["services"]["llm"] = { + "status": f"⚠️ Connection Failed (Ollama): {str(e)}", + "healthy": False, + "critical": False, + } + overall_healthy = False + elif llm_provider == "openai": + try: + loop = asyncio.get_running_loop() + # For OpenAI, a simple list models call or a dummy completion call can serve as a health check + # Using models.list() as it's a common way to check API connectivity + models = await asyncio.wait_for( + loop.run_in_executor(None, llm_client.models.list), timeout=8.0 + ) + model_count = len(models.data) # OpenAI returns models in .data + health_status["services"]["llm"] = { + "status": "βœ… Connected (OpenAI)", + "healthy": True, + "models": model_count, + "critical": False, + } + except asyncio.TimeoutError: + health_status["services"]["llm"] = { + "status": "⚠️ Connection Timeout (8s) - OpenAI API may be unreachable", + "healthy": False, + "critical": False, + } + overall_healthy = False + except Exception as e: + health_status["services"]["llm"] = { + "status": f"⚠️ Connection Failed (OpenAI): {str(e)}", + "healthy": False, + "critical": False, + } + overall_healthy = False + else: + health_status["services"]["llm"] = { + "status": "⚠️ LLM_PROVIDER not specified or invalid", "healthy": False, "critical": False, } @@ -1771,13 +505,13 @@ async def health_check(): # Check ASR service (non-critical - may be external) try: - test_client = AsyncTcpClient.from_uri(OFFLINE_ASR_TCP_URI) + test_client = AsyncTcpClient.from_uri(os.getenv("OFFLINE_ASR_TCP_URI", "tcp://192.168.0.110:8765/")) await asyncio.wait_for(test_client.connect(), timeout=5.0) await test_client.disconnect() health_status["services"]["asr"] = { "status": "βœ… Connected", "healthy": True, - "uri": OFFLINE_ASR_TCP_URI, + "uri": os.getenv("OFFLINE_ASR_TCP_URI", "tcp://192.168.0.110:8765/"), "critical": False, } except asyncio.TimeoutError: @@ -1798,23 +532,9 @@ async def health_check(): overall_healthy = False # Track health check results in metrics - try: - metrics_collector = get_metrics_collector() - for service_name, service_info in health_status["services"].items(): - success = service_info.get("healthy", False) - failure_reason = ( - None if success else service_info.get("status", "Unknown failure") - ) - metrics_collector.record_service_health_check( - service_name, success, failure_reason - ) - - # Also track overall system health - metrics_collector.record_service_health_check( - "friend-backend", overall_healthy, "System health check" - ) - except Exception as e: - audio_logger.error(f"Failed to record health check metrics: {e}") + # The metrics_collector is now passed to ClientState and used there. + # No need to get it here. + pass # Set overall status health_status["overall_healthy"] = overall_healthy @@ -1995,8 +715,7 @@ async def debug_memory_processing_stats(user_id: Optional[str] = None): async def get_current_metrics(): """Get current metrics summary for monitoring dashboard.""" try: - metrics_collector = get_metrics_collector() - metrics_summary = metrics_collector.get_current_metrics_summary() + metrics_summary = self.metrics_collector.get_current_metrics_summary() return metrics_summary except Exception as e: audio_logger.error(f"Error getting current metrics: {e}") diff --git a/backends/advanced-backend/src/memory/__init__.py b/backends/advanced-backend/src/memory/__init__.py index 4fccd040..912b55c9 100644 --- a/backends/advanced-backend/src/memory/__init__.py +++ b/backends/advanced-backend/src/memory/__init__.py @@ -8,11 +8,13 @@ from .memory_service import ( MemoryService, - init_memory_config, + # init_memory_config, get_memory_service, shutdown_memory_service, ) +from .mem0_client import get_memory_client + __all__ = [ "MemoryService", "init_memory_config", diff --git a/backends/advanced-backend/src/memory/mem0_client.py b/backends/advanced-backend/src/memory/mem0_client.py new file mode 100644 index 00000000..325c6d5b --- /dev/null +++ b/backends/advanced-backend/src/memory/mem0_client.py @@ -0,0 +1,136 @@ +from mem0 import Memory +import os + +# Custom instructions for memory processing +# These aren't being used right now but Mem0 does support adding custom prompting +# for handling memory retrieval and processing. +CUSTOM_INSTRUCTIONS = """ +Extract the Following Information: + +- Key Information: Identify and save the most important details. +- Context: Capture the surrounding context to understand the memory's relevance. +- Connections: Note any relationships to other topics or memories. +- Importance: Highlight why this information might be valuable in the future. +- Source: Record where this information came from when applicable. +""" + +def get_memory_client(): + # Get LLM provider and configuration + llm_provider = os.getenv('LLM_PROVIDER') + llm_api_key = os.getenv('LLM_API_KEY') + llm_model = os.getenv('LLM_CHOICE') + embedding_model = os.getenv('EMBEDDING_MODEL_CHOICE') + vector_store_provider = os.getenv('VECTOR_STORE_PROVIDER', 'qdrant') + graph_store_provider = os.getenv('GRAPH_STORE_PROVIDER') + + # Initialize config dictionary + config = {} + print(f"llm_provider: {llm_provider}") + + # Configure LLM based on provider + if llm_provider == 'openai' or llm_provider == 'openrouter': + print(f"llm_provider: {llm_provider}") + config["llm"] = { + "provider": "openai", + "config": { + "model": llm_model, + "temperature": 0.2, + "max_tokens": 2000, + } + } + + print(f"llm_api_key: {llm_api_key}") + # Set API key in environment if not already set + if llm_api_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = llm_api_key + + # For OpenRouter, set the specific API key + if llm_provider == 'openrouter' and llm_api_key: + os.environ["OPENROUTER_API_KEY"] = llm_api_key + + elif llm_provider == 'ollama': + config["llm"] = { + "provider": "ollama", + "config": { + "model": llm_model, + "temperature": 0.2, + "max_tokens": 2000, + } + } + + # Set base URL for Ollama if provided + llm_base_url = os.getenv('LLM_BASE_URL') + if llm_base_url: + config["llm"]["config"]["llm_base_url"] = llm_base_url + + # Configure embedder based on provider + if llm_provider == 'openai': + config["embedder"] = { + "provider": "openai", + "config": { + "model": embedding_model or "text-embedding-3-small", + "embedding_dims": 1536 # Default for text-embedding-3-small + } + } + + # Set API key in environment if not already set + if llm_api_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = llm_api_key + + elif llm_provider == 'ollama': + config["embedder"] = { + "provider": "ollama", + "config": { + "model": embedding_model or "nomic-embed-text", + "embedding_dims": 768 # Default for nomic-embed-text + } + } + + # Set base URL for Ollama if provided + embedding_base_url = os.getenv('LLM_BASE_URL') + if embedding_base_url: + config["embedder"]["config"]["llm_base_url"] = embedding_base_url + + # Configure Supabase vector store + if vector_store_provider == 'supabase': + config["vector_store"] = { + "provider": "supabase", + "config": { + "connection_string": os.environ.get('DATABASE_URL', ''), + "collection_name": "mem0_memories", + "embedding_model_dims": 1536 if llm_provider == "openai" else 768 + } + } + elif vector_store_provider == 'qdrant': + config["vector_store"] = { + "provider": "qdrant", + "config": { + "collection_name": "mem0_memories", + "embedding_model_dims": 1536 if llm_provider == "openai" else 768 + } + } + + if graph_store_provider == 'neo4j': + config["graph_store"] = { + "provider": "neo4j", + "config": { + "url": os.environ.get('neo4j://'+os.environ.get('NEO4J_HOST', '')+':7687'), + "username": os.environ.get('NEO4J_USERNAME', ''), + "password": os.environ.get('NEO4J_PASSWORD', ''), + } + } + elif graph_store_provider == 'memgraph': + config["graph_store"] = { + "provider": "memgraph", + "config": { + "url": os.environ.get('MEMGRAPH_URI', ''), + "username": os.environ.get('MEMGRAPH_USERNAME', ''), + "password": os.environ.get('MEMGRAPH_PASSWORD', ''), + } + } + + # config["custom_fact_extraction_prompt"] = {CUSTOM_INSTRUCTIONS} + print(f"config: {config}") + + # Create and return the Memory client + return Memory.from_config(config) \ No newline at end of file diff --git a/backends/advanced-backend/src/memory/mem0_functions.py b/backends/advanced-backend/src/memory/mem0_functions.py new file mode 100644 index 00000000..db4a4c31 --- /dev/null +++ b/backends/advanced-backend/src/memory/mem0_functions.py @@ -0,0 +1,128 @@ +from mcp.server.fastmcp import FastMCP, Context +from contextlib import asynccontextmanager +from collections.abc import AsyncIterator +from dataclasses import dataclass +from dotenv import load_dotenv +from mem0 import Memory +import asyncio +import json +import os + +from utils import get_mem0_client + +load_dotenv() + +# Default user ID for memory operations +DEFAULT_USER_ID = "user" + +# Create a dataclass for our application context +@dataclass +class Mem0Context: + """Context for the Mem0 MCP server.""" + mem0_client: Memory + +@asynccontextmanager +async def mem0_lifespan(server: FastMCP) -> AsyncIterator[Mem0Context]: + """ + Manages the Mem0 client lifecycle. + + Args: + server: The FastMCP server instance + + Yields: + Mem0Context: The context containing the Mem0 client + """ + # Create and return the Memory client with the helper function in utils.py + mem0_client = get_mem0_client() + + try: + yield Mem0Context(mem0_client=mem0_client) + finally: + # No explicit cleanup needed for the Mem0 client + pass + +# Initialize FastMCP server with the Mem0 client as context +mcp = FastMCP( + "mcp-mem0", + description="MCP server for long term memory storage and retrieval with Mem0", + lifespan=mem0_lifespan, + host=os.getenv("HOST", "0.0.0.0"), + port=os.getenv("PORT", "8050") +) + +@mcp.tool() +async def save_memory(ctx: Context, text: str) -> str: + """Save information to your long-term memory. + + This tool is designed to store any type of information that might be useful in the future. + The content will be processed and indexed for later retrieval through semantic search. + + Args: + ctx: The MCP server provided context which includes the Mem0 client + text: The content to store in memory, including any relevant details and context + """ + try: + mem0_client = ctx.request_context.lifespan_context.mem0_client + messages = [{"role": "user", "content": text}] + mem0_client.add(messages, user_id=DEFAULT_USER_ID) + return f"Successfully saved memory: {text[:100]}..." if len(text) > 100 else f"Successfully saved memory: {text}" + except Exception as e: + return f"Error saving memory: {str(e)}" + +@mcp.tool() +async def get_all_memories(ctx: Context) -> str: + """Get all stored memories for the user. + + Call this tool when you need complete context of all previously memories. + + Args: + ctx: The MCP server provided context which includes the Mem0 client + + Returns a JSON formatted list of all stored memories, including when they were created + and their content. Results are paginated with a default of 50 items per page. + """ + try: + mem0_client = ctx.request_context.lifespan_context.mem0_client + memories = mem0_client.get_all(user_id=DEFAULT_USER_ID) + if isinstance(memories, dict) and "results" in memories: + flattened_memories = [memory["memory"] for memory in memories["results"]] + else: + flattened_memories = memories + return json.dumps(flattened_memories, indent=2) + except Exception as e: + return f"Error retrieving memories: {str(e)}" + +@mcp.tool() +async def search_memories(ctx: Context, query: str, limit: int = 3) -> str: + """Search memories using semantic search. + + This tool should be called to find relevant information from your memory. Results are ranked by relevance. + Always search your memories before making decisions to ensure you leverage your existing knowledge. + + Args: + ctx: The MCP server provided context which includes the Mem0 client + query: Search query string describing what you're looking for. Can be natural language. + limit: Maximum number of results to return (default: 3) + """ + try: + mem0_client = ctx.request_context.lifespan_context.mem0_client + memories = mem0_client.search(query, user_id=DEFAULT_USER_ID, limit=limit) + if isinstance(memories, dict) and "results" in memories: + flattened_memories = [memory["memory"] for memory in memories["results"]] + else: + flattened_memories = memories + return json.dumps(flattened_memories, indent=2) + except Exception as e: + return f"Error searching memories: {str(e)}" + +async def main(): + transport = os.getenv("TRANSPORT", "sse") + if transport == 'sse': + # Run the MCP server with sse transport + await mcp.run_sse_async() + else: + # Run the MCP server with stdio transport + await mcp.run_stdio_async() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/backends/advanced-backend/src/memory/memory_service.py b/backends/advanced-backend/src/memory/memory_service.py index f2a3f12f..bf8e781c 100644 --- a/backends/advanced-backend/src/memory/memory_service.py +++ b/backends/advanced-backend/src/memory/memory_service.py @@ -13,7 +13,7 @@ from typing import Optional, List, Dict, Any from mem0 import Memory -import ollama +from .mem0_client import get_memory_client # Configure Mem0 telemetry based on environment variable # Set default to False for privacy unless explicitly enabled @@ -32,105 +32,24 @@ OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") QDRANT_BASE_URL = os.getenv("QDRANT_BASE_URL", "qdrant") -# Global memory configuration -MEM0_CONFIG = { - "llm": { - "provider": "ollama", - "config": { - "model": "llama3.1:latest", - "ollama_base_url": OLLAMA_BASE_URL, - "temperature": 0, - "max_tokens": 2000, - }, - }, - "embedder": { - "provider": "ollama", - "config": { - "model": "nomic-embed-text:latest", - "embedding_dims": 768, - "ollama_base_url": OLLAMA_BASE_URL, - }, - }, - "vector_store": { - "provider": "qdrant", - "config": { - "collection_name": "omi_memories", - "embedding_model_dims": 768, - "host": QDRANT_BASE_URL, - "port": 6333, - }, - }, - "custom_prompt": "Extract action items from the conversation. Don't extract likes and dislikes.", -} # Action item extraction configuration -ACTION_ITEM_EXTRACTION_PROMPT = """ -You are an AI assistant specialized in extracting actionable tasks from meeting transcripts and conversations. - -Analyze the following conversation transcript and extract all action items, tasks, and commitments mentioned. - -For each action item you find, return a JSON object with these fields: -- "description": A clear, specific description of the task -- "assignee": The person responsible (use "unassigned" if not specified) -- "due_date": The deadline if mentioned (use "not_specified" if not mentioned) -- "priority": The urgency level ("high", "medium", "low", or "not_specified") -- "status": Always set to "open" for new items -- "context": A brief context about when/why this was mentioned - -Return ONLY a valid JSON array of action items. If no action items are found, return an empty array []. - -Examples of action items to look for: -- "I'll send you the report by Friday" -- "We need to schedule a follow-up meeting" -- "Can you review the document before tomorrow?" -- "Let's get that bug fixed" -- "I'll call the client next week" - -Transcript: -{transcript} -""" # Global instances _memory_service = None _process_memory = None # For worker processes - -def init_memory_config( - ollama_base_url: Optional[str] = None, - qdrant_base_url: Optional[str] = None, - organization_id: Optional[str] = None, - project_id: Optional[str] = None, - app_id: Optional[str] = None, -) -> dict: - """Initialize and return memory configuration with optional overrides.""" - global MEM0_CONFIG, MEM0_ORGANIZATION_ID, MEM0_PROJECT_ID, MEM0_APP_ID - - memory_logger.info(f"Initializing MemoryService with Qdrant URL: {qdrant_base_url} and Ollama base URL: {ollama_base_url}") - - if ollama_base_url: - MEM0_CONFIG["llm"]["config"]["ollama_base_url"] = ollama_base_url - MEM0_CONFIG["embedder"]["config"]["ollama_base_url"] = ollama_base_url - - if qdrant_base_url: - MEM0_CONFIG["vector_store"]["config"]["host"] = qdrant_base_url - - if organization_id: - MEM0_ORGANIZATION_ID = organization_id - - if project_id: - MEM0_PROJECT_ID = project_id - - if app_id: - MEM0_APP_ID = app_id - - return MEM0_CONFIG +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +RESET = "\033[0m" def _init_process_memory(): """Initialize memory instance once per worker process.""" global _process_memory if _process_memory is None: - _process_memory = Memory.from_config(MEM0_CONFIG) + _process_memory = get_memory_client() return _process_memory @@ -142,139 +61,31 @@ def _add_memory_to_store(transcript: str, client_id: str, audio_uuid: str) -> bo """ try: # Get or create the persistent memory instance for this process - process_memory = _init_process_memory() - process_memory.add( + mem0_client = _init_process_memory() + response = mem0_client.add( transcript, user_id=client_id, - metadata={ - "source": "offline_streaming", - "audio_uuid": audio_uuid, - "timestamp": int(time.time()), - "conversation_context": "audio_transcription", - "device_type": "audio_recording", - "organization_id": MEM0_ORGANIZATION_ID, - "project_id": MEM0_PROJECT_ID, - "app_id": MEM0_APP_ID, - }, + # metadata={ + # "source": "offline_streaming", + # "audio_uuid": audio_uuid, + # "timestamp": int(time.time()), + # "conversation_context": "audio_transcription", + # "device_type": "audio_recording", + # "organization_id": MEM0_ORGANIZATION_ID, + # "project_id": MEM0_PROJECT_ID, + # "app_id": MEM0_APP_ID, + # }, ) + memory_logger.info(f"Added transcript {transcript} for {audio_uuid} to mem0 (client: {client_id})") + memory_logger.info(f"Memory add response: {response}") return True except Exception as e: memory_logger.error(f"Error adding memory for {audio_uuid}: {e}") return False -def _extract_action_items_from_transcript(transcript: str, client_id: str, audio_uuid: str) -> List[Dict[str, Any]]: - """ - Extract action items from transcript using Ollama. - This function will be used in the processing pipeline. - """ - try: - # Get or create the persistent memory instance for this process - process_memory = _init_process_memory() - - # Initialize Ollama client with the same config as Mem0 - ollama_client = ollama.Client(host=OLLAMA_BASE_URL) - - # Format the prompt with the transcript - prompt = ACTION_ITEM_EXTRACTION_PROMPT.format(transcript=transcript) - - # Call Ollama to extract action items - response = ollama_client.chat( - model="llama3.1:latest", - messages=[ - {"role": "system", "content": "You are an expert at extracting action items from conversations. Always return valid JSON."}, - {"role": "user", "content": prompt} - ], - options={ - "temperature": 0.1, # Low temperature for consistent extraction - "num_predict": 1000, # Enough tokens for multiple action items - } - ) - - # Parse the response - response_text = response['message']['content'].strip() - - # Try to parse JSON from the response - try: - # Clean up the response if it has markdown formatting - if response_text.startswith('```json'): - response_text = response_text.replace('```json', '').replace('```', '').strip() - elif response_text.startswith('```'): - response_text = response_text.replace('```', '').strip() - - action_items = json.loads(response_text) - - # Validate that we got a list - if not isinstance(action_items, list): - memory_logger.warning(f"Action item extraction returned non-list for {audio_uuid}: {type(action_items)}") - return [] - - # Add metadata to each action item - for item in action_items: - if isinstance(item, dict): - item.update({ - "audio_uuid": audio_uuid, - "client_id": client_id, - "created_at": int(time.time()), - "source": "transcript_extraction", - "id": f"action_{audio_uuid}_{len(action_items)}_{int(time.time())}" - }) - - memory_logger.info(f"Extracted {len(action_items)} action items from {audio_uuid}") - return action_items - - except json.JSONDecodeError as e: - memory_logger.error(f"Failed to parse action items JSON for {audio_uuid}: {e}") - memory_logger.error(f"Raw response: {response_text}") - return [] - - except Exception as e: - memory_logger.error(f"Error extracting action items for {audio_uuid}: {e}") - return [] -def _add_action_items_to_store(action_items: List[Dict[str, Any]], client_id: str, audio_uuid: str) -> bool: - """ - Store extracted action items in Mem0 with proper metadata. - """ - try: - if not action_items: - return True # Nothing to store, but not an error - - # Get or create the persistent memory instance for this process - process_memory = _init_process_memory() - - for item in action_items: - # Format the action item as a message for Mem0 - action_text = f"Action Item: {item.get('description', 'No description')}" - if item.get('assignee') and item.get('assignee') != 'unassigned': - action_text += f" (Assigned to: {item['assignee']})" - if item.get('due_date') and item.get('due_date') != 'not_specified': - action_text += f" (Due: {item['due_date']})" - - # Store in Mem0 with infer=False to preserve exact content - process_memory.add( - action_text, - user_id=client_id, - metadata={ - "type": "action_item", - "source": "transcript_extraction", - "audio_uuid": audio_uuid, - "timestamp": int(time.time()), - "action_item_data": item, # Store the full action item data - "organization_id": MEM0_ORGANIZATION_ID, - "project_id": MEM0_PROJECT_ID, - "app_id": MEM0_APP_ID, - }, - infer=False # Don't let Mem0 modify our action items - ) - - memory_logger.info(f"Stored {len(action_items)} action items for {audio_uuid}") - return True - - except Exception as e: - memory_logger.error(f"Error storing action items for {audio_uuid}: {e}") - return False class MemoryService: @@ -290,10 +101,8 @@ def initialize(self): return try: - # Log Qdrant and Ollama URLs - memory_logger.info(f"Initializing MemoryService with Qdrant URL: {MEM0_CONFIG['vector_store']['config']['host']} and Ollama base URL: {MEM0_CONFIG['llm']['config']['ollama_base_url']}") # Initialize main memory instance - self.memory = Memory.from_config(MEM0_CONFIG) + self.memory = get_memory_client() self._initialized = True memory_logger.info("Memory service initialized successfully") @@ -317,35 +126,6 @@ def add_memory(self, transcript: str, client_id: str, audio_uuid: str) -> bool: memory_logger.error(f"Error adding memory for {audio_uuid}: {e}") return False - def extract_and_store_action_items(self, transcript: str, client_id: str, audio_uuid: str) -> int: - """ - Extract action items from transcript and store them in Mem0. - Returns the number of action items extracted and stored. - """ - if not self._initialized: - self.initialize() - - try: - # Extract action items from the transcript - action_items = _extract_action_items_from_transcript(transcript, client_id, audio_uuid) - - if not action_items: - memory_logger.info(f"No action items found in transcript for {audio_uuid}") - return 0 - - # Store action items in Mem0 - success = _add_action_items_to_store(action_items, client_id, audio_uuid) - - if success: - memory_logger.info(f"Successfully extracted and stored {len(action_items)} action items for {audio_uuid}") - return len(action_items) - else: - memory_logger.error(f"Failed to store action items for {audio_uuid}") - return 0 - - except Exception as e: - memory_logger.error(f"Error extracting action items for {audio_uuid}: {e}") - return 0 def get_action_items(self, user_id: str, limit: int = 50, status_filter: Optional[str] = None) -> List[Dict[str, Any]]: """ diff --git a/backends/advanced-backend/src/memory/utils.py b/backends/advanced-backend/src/memory/utils.py new file mode 100644 index 00000000..23630a63 --- /dev/null +++ b/backends/advanced-backend/src/memory/utils.py @@ -0,0 +1,102 @@ +from mem0 import Memory +import os + +# Custom instructions for memory processing +# These aren't being used right now but Mem0 does support adding custom prompting +# for handling memory retrieval and processing. +CUSTOM_INSTRUCTIONS = """ +Extract the Following Information: + +- Key Information: Identify and save the most important details. +- Context: Capture the surrounding context to understand the memory's relevance. +- Connections: Note any relationships to other topics or memories. +- Importance: Highlight why this information might be valuable in the future. +- Source: Record where this information came from when applicable. +""" + +def get_mem0_client(): + # Get LLM provider and configuration + llm_provider = os.getenv('LLM_PROVIDER') + llm_api_key = os.getenv('LLM_API_KEY') + llm_model = os.getenv('LLM_CHOICE') + embedding_model = os.getenv('EMBEDDING_MODEL_CHOICE') + + # Initialize config dictionary + config = {} + + # Configure LLM based on provider + if llm_provider == 'openai' or llm_provider == 'openrouter': + config["llm"] = { + "provider": "openai", + "config": { + "model": llm_model, + "temperature": 0.2, + "max_tokens": 2000, + } + } + + # Set API key in environment if not already set + if llm_api_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = llm_api_key + + # For OpenRouter, set the specific API key + if llm_provider == 'openrouter' and llm_api_key: + os.environ["OPENROUTER_API_KEY"] = llm_api_key + + elif llm_provider == 'ollama': + config["llm"] = { + "provider": "ollama", + "config": { + "model": llm_model, + "temperature": 0.2, + "max_tokens": 2000, + } + } + + # Set base URL for Ollama if provided + llm_base_url = os.getenv('LLM_BASE_URL') + if llm_base_url: + config["llm"]["config"]["ollama_base_url"] = llm_base_url + + # Configure embedder based on provider + if llm_provider == 'openai': + config["embedder"] = { + "provider": "openai", + "config": { + "model": embedding_model or "text-embedding-3-small", + "embedding_dims": 1536 # Default for text-embedding-3-small + } + } + + # Set API key in environment if not already set + if llm_api_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = llm_api_key + + elif llm_provider == 'ollama': + config["embedder"] = { + "provider": "ollama", + "config": { + "model": embedding_model or "nomic-embed-text", + "embedding_dims": 768 # Default for nomic-embed-text + } + } + + # Set base URL for Ollama if provided + embedding_base_url = os.getenv('LLM_BASE_URL') + if embedding_base_url: + config["embedder"]["config"]["ollama_base_url"] = embedding_base_url + + # Configure Supabase vector store + config["vector_store"] = { + "provider": "supabase", + "config": { + "connection_string": os.environ.get('DATABASE_URL', ''), + "collection_name": "mem0_memories", + "embedding_model_dims": 1536 if llm_provider == "openai" else 768 + } + } + + # config["custom_fact_extraction_prompt"] = CUSTOM_INSTRUCTIONS + + # Create and return the Memory client + return Memory.from_config(config) \ No newline at end of file diff --git a/backends/advanced-backend/src/metrics.py b/backends/advanced-backend/src/metrics.py index 9b4072ba..070faa6b 100644 --- a/backends/advanced-backend/src/metrics.py +++ b/backends/advanced-backend/src/metrics.py @@ -351,20 +351,19 @@ def get_current_metrics_summary(self) -> dict: # Global metrics collector instance _metrics_collector: Optional[MetricsCollector] = None -def get_metrics_collector() -> MetricsCollector: +def get_metrics_collector(debug_dir: str | Path = "/app/debug_dir") -> MetricsCollector: """Get the global metrics collector instance""" global _metrics_collector if _metrics_collector is None: - debug_dir = "/app/debug_dir" # this is only for docker right now _metrics_collector = MetricsCollector(debug_dir) return _metrics_collector -async def start_metrics_collection(): +async def start_metrics_collection(debug_dir: str | Path): """Start metrics collection""" - collector = get_metrics_collector() + collector = get_metrics_collector(debug_dir) await collector.start() -async def stop_metrics_collection(): +async def stop_metrics_collection(debug_dir: str | Path): """Stop metrics collection""" - collector = get_metrics_collector() - await collector.stop() \ No newline at end of file + collector = get_metrics_collector(debug_dir) + await collector.stop() \ No newline at end of file diff --git a/backends/advanced-backend/src/models/audio_chunk.py b/backends/advanced-backend/src/models/audio_chunk.py new file mode 100644 index 00000000..bc2e726b --- /dev/null +++ b/backends/advanced-backend/src/models/audio_chunk.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel +from typing import List, Optional + +class TranscriptSegment(BaseModel): + speaker: str + text: str + start: float + end: float + +class SpeechSegment(BaseModel): + start: float + end: float + +class AudioChunk(BaseModel): + audio_uuid: str + audio_path: str + client_id: str + timestamp: int + transcript: List[TranscriptSegment] = [] + speakers_identified: List[str] = [] + cropped_audio_path: Optional[str] = None + speech_segments: List[SpeechSegment] = [] + cropped_duration: Optional[float] = None + cropped_at: Optional[float] = None \ No newline at end of file diff --git a/backends/advanced-backend/src/models/client_state.py b/backends/advanced-backend/src/models/client_state.py new file mode 100644 index 00000000..fdef069e --- /dev/null +++ b/backends/advanced-backend/src/models/client_state.py @@ -0,0 +1,76 @@ +import asyncio +import logging +import time +import uuid +from typing import Optional, Tuple, Any, Type +from pathlib import Path + +from easy_audio_interfaces.filesystem.filesystem_interfaces import LocalFileSink +from wyoming.audio import AudioChunk + +from memory import get_memory_service +from metrics import get_metrics_collector + +# Logging setup +audio_logger = logging.getLogger("audio_processing") +logger = logging.getLogger("client_state") + +class ClientState: + """Manages all state for a single client connection.""" + + def __init__(self, client_id: str, audio_chunk_utils, metrics_collector, active_clients, config: dict, transcription_manager_class: Type[Any]): + self.client_id = client_id + self.connected = True + self.audio_chunk_utils = audio_chunk_utils + self.metrics_collector = metrics_collector + self.active_clients = active_clients + + + # Configuration values + self.CHUNK_DIR = config.get("CHUNK_DIR", Path("./audio_chunks")) + self.OMI_SAMPLE_RATE = config.get("OMI_SAMPLE_RATE", 16_000) + self.OMI_CHANNELS = config.get("OMI_CHANNELS", 1) + self.OMI_SAMPLE_WIDTH = config.get("OMI_SAMPLE_WIDTH", 2) + self.NEW_CONVERSATION_TIMEOUT_MINUTES = config.get("NEW_CONVERSATION_TIMEOUT_MINUTES", 1.5) + self.AUDIO_CROPPING_ENABLED = config.get("AUDIO_CROPPING_ENABLED", False) + self.MIN_SPEECH_SEGMENT_DURATION = config.get("MIN_SPEECH_SEGMENT_DURATION", 1.0) + self.CROPPING_CONTEXT_PADDING = config.get("CROPPING_CONTEXT_PADDING", 0.1) + self._DEC_IO_EXECUTOR = config.get("_DEC_IO_EXECUTOR") + self.memory_service = get_memory_service() # Get global instance + self.action_items_service = config.get("action_items_service") # Passed from main + + # Per-client queues + self.chunk_queue = asyncio.Queue[Optional[AudioChunk]]() + self.transcription_queue = asyncio.Queue[Tuple[Optional[str], Optional[AudioChunk]]]() + self.memory_queue = asyncio.Queue[Tuple[Optional[str], Optional[str], Optional[str]]]() # (transcript, client_id, audio_uuid) + self.action_item_queue = asyncio.Queue[Tuple[Optional[str], Optional[str], Optional[str]]]() # (transcript_text, client_id, audio_uuid) + + # Per-client file sink + self.file_sink: Optional[LocalFileSink] = None + self.current_audio_uuid: Optional[str] = None + + # Per-client transcription manager + self.transcription_manager: Optional[Any] = None + self.transcription_manager_class: Type[Any] = transcription_manager_class + # Conversation timeout tracking + self.last_transcript_time: Optional[float] = None + self.conversation_start_time: float = time.time() + + # Speech segment tracking for audio cropping + self.speech_segments: dict[str, list[tuple[float, float]]] = ( + {} + ) # audio_uuid -> [(start, end), ...] + self.current_speech_start: dict[str, Optional[float]] = ( + {} + ) # audio_uuid -> start_time + + # Conversation transcript collection for end-of-conversation memory processing + self.conversation_transcripts: list[str] = ( + [] + ) # Collect all transcripts for this conversation + + # Tasks for this client + self.saver_task: Optional[asyncio.Task] = None + self.transcription_task: Optional[asyncio.Task] = None + self.memory_task: Optional[asyncio.Task] = None + self.action_item_task: Optional[asyncio.Task] = None diff --git a/backends/advanced-backend/src/models/user.py b/backends/advanced-backend/src/models/user.py new file mode 100644 index 00000000..e7197d63 --- /dev/null +++ b/backends/advanced-backend/src/models/user.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel +from typing import Optional + +class User(BaseModel): + user_id: str diff --git a/backends/advanced-backend/src/routers/action_items_router.py b/backends/advanced-backend/src/routers/action_items_router.py new file mode 100644 index 00000000..9cabc0af --- /dev/null +++ b/backends/advanced-backend/src/routers/action_items_router.py @@ -0,0 +1,20 @@ + +from fastapi import APIRouter +from fastapi.responses import JSONResponse +from action_items_service import ActionItemsService as action_items_service +from utils.logging import items_logger + +router = APIRouter() + +@router.get("/api/action_items") +async def get_action_items(user_id: str, limit: int = 100): + """Retrieves action items from the action items service.""" + print(f"Getting action items for user {user_id}") + try: + action_items = action_items_service.get_action_items(user_id=user_id, limit=limit) + return JSONResponse(content=action_items) + except Exception as e: + # items_logger.error(f"Error fetching action items: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error fetching action items"} + ) diff --git a/backends/advanced-backend/src/routers/audio_chunks_router.py b/backends/advanced-backend/src/routers/audio_chunks_router.py new file mode 100644 index 00000000..efa99c9f --- /dev/null +++ b/backends/advanced-backend/src/routers/audio_chunks_router.py @@ -0,0 +1,49 @@ +from fastapi import APIRouter, Depends, Query +from typing import Optional + +from motor.motor_asyncio import AsyncIOMotorClient + +from utils.audio_chunk_utils import AudioChunkUtils +from database import get_db_client + +router = APIRouter() + +def get_audio_chunk_utils( + db_client: AsyncIOMotorClient = Depends(get_db_client) +): + chunks_col = db_client.get_default_database("friend-lite")["audio_chunks"] + return AudioChunkUtils(chunks_col) + +@router.get("/api/conversations") +async def get_conversations(audio_chunk_utils: AudioChunkUtils = Depends(get_audio_chunk_utils)): + """Get all conversations grouped by client_id.""" + return await audio_chunk_utils.get_conversations() + +@router.get("/api/conversations/{audio_uuid}/cropped") +async def get_cropped_audio_info(audio_uuid: str, audio_chunk_utils: AudioChunkUtils = Depends(get_audio_chunk_utils)): + """Get cropped audio information for a specific conversation.""" + return await audio_chunk_utils.get_cropped_audio_info(audio_uuid) + +@router.post("/api/conversations/{audio_uuid}/reprocess") +async def reprocess_audio_cropping(audio_uuid: str, audio_chunk_utils: AudioChunkUtils = Depends(get_audio_chunk_utils)): + """Trigger reprocessing of audio cropping for a specific conversation.""" + return await audio_chunk_utils.reprocess_audio_cropping(audio_uuid) + +@router.post("/api/conversations/{audio_uuid}/speakers") +async def add_speaker_to_conversation(audio_uuid: str, speaker_id: str, audio_chunk_utils: AudioChunkUtils = Depends(get_audio_chunk_utils)): + """Add a speaker to the speakers_identified list for a conversation.""" + return await audio_chunk_utils.add_speaker_to_conversation(audio_uuid, speaker_id) + +@router.put("/api/conversations/{audio_uuid}/transcript/{segment_index}") +async def update_transcript_segment( + audio_uuid: str, + segment_index: int, + speaker_id: Optional[str] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + audio_chunk_utils: AudioChunkUtils = Depends(get_audio_chunk_utils) +): + """Update a specific transcript segment with speaker or timing information.""" + return await audio_chunk_utils.update_transcript_segment( + audio_uuid, segment_index, speaker_id, start_time, end_time + ) \ No newline at end of file diff --git a/backends/advanced-backend/src/routers/memory_router.py b/backends/advanced-backend/src/routers/memory_router.py new file mode 100644 index 00000000..b30cf261 --- /dev/null +++ b/backends/advanced-backend/src/routers/memory_router.py @@ -0,0 +1,58 @@ + +from fastapi import APIRouter +from fastapi.responses import JSONResponse +from utils.logging import audio_logger, memory_logger + + +router = APIRouter() + +from memory.memory_service import get_memory_service + +memory_service = get_memory_service() + + +@router.get("/api/memories") +async def get_memories(user_id: str, limit: int = 100): + """Retrieves memories from the mem0 store with optional filtering.""" + memory_logger.info(f"Fetching memories for user {user_id}") + try: + all_memories = memory_service.get_all_memories(user_id=user_id, limit=limit) + memory_logger.info(f"Retrieved {len(all_memories)} memories for user {user_id}") + return JSONResponse(content=all_memories) + except Exception as e: + memory_logger.error(f"Error fetching memories: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error fetching memories"} + ) + + +@router.get("/api/memories/search") +async def search_memories(user_id: str, query: str, limit: int = 10): + """Search memories using semantic similarity for better retrieval.""" + try: + relevant_memories = memory_service.search_memories( + query=query, user_id=user_id, limit=limit + ) + return JSONResponse(content=relevant_memories) + except Exception as e: + memory_logger.error(f"Error searching memories: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error searching memories"} + ) + + +@router.delete("/api/memories/{memory_id}") +async def delete_memory(memory_id: str): + """Delete a specific memory by ID.""" + try: + memory_service.delete_memory(memory_id=memory_id) + return JSONResponse( + content={"message": f"Memory {memory_id} deleted successfully"} + ) + except Exception as e: + memory_logger.error(f"Error deleting memory {memory_id}: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error deleting memory"} + ) + + diff --git a/backends/advanced-backend/src/routers/user_router.py b/backends/advanced-backend/src/routers/user_router.py new file mode 100644 index 00000000..b60e7076 --- /dev/null +++ b/backends/advanced-backend/src/routers/user_router.py @@ -0,0 +1,43 @@ +from fastapi import APIRouter, Depends, Query +from typing import Optional +from motor.motor_asyncio import AsyncIOMotorClient + +from utils.user_utils import UserUtils +from memory.memory_service import get_memory_service # Assuming this is how memory_service is obtained +from database import get_db_client # Assuming this is how the database client is obtained + +router = APIRouter() + +def get_user_utils( + db_client: AsyncIOMotorClient = Depends(get_db_client), + memory_service = Depends(get_memory_service) # Assuming get_memory_service is a dependency +): + users_col = db_client.get_default_database("friend-lite")["users"] + return UserUtils(users_col) + +@router.get("/api/users") +async def get_users(user_utils: UserUtils = Depends(get_user_utils)): + """Retrieves all users from the database.""" + return await user_utils.get_all_users() + +@router.get("/api/users/{user_id}") +async def get_user_by_id(user_id: str, user_utils: UserUtils = Depends(get_user_utils)): + """Retrieves a single user by their user_id.""" + return await user_utils.get_user_by_name(user_id) + +@router.post("/api/create_user") +async def create_user(user_id: str, user_utils: UserUtils = Depends(get_user_utils)): + """Creates a new user in the database.""" + return await user_utils.create_new_user(user_id) + +@router.delete("/api/delete_user") +async def delete_user( + user_id: str, + delete_conversations: bool = False, + delete_memories: bool = False, + user_utils: UserUtils = Depends(get_user_utils), + db_client: AsyncIOMotorClient = Depends(get_db_client) +): + """Deletes a user from the database with optional data cleanup.""" + chunks_col = db_client.get_default_database("friend-lite")["audio_chunks"] + return await user_utils.delete_user_data(user_id, delete_conversations, delete_memories, chunks_col) diff --git a/backends/advanced-backend/src/utils/audio_chunk_utils.py b/backends/advanced-backend/src/utils/audio_chunk_utils.py new file mode 100644 index 00000000..aee7da72 --- /dev/null +++ b/backends/advanced-backend/src/utils/audio_chunk_utils.py @@ -0,0 +1,458 @@ +import asyncio +import logging +import os +import time +from pathlib import Path +from typing import List, Optional, Tuple + +from fastapi.responses import JSONResponse +from motor.motor_asyncio import AsyncIOMotorClient +from easy_audio_interfaces.filesystem.filesystem_interfaces import LocalFileSink + +from models.audio_chunk import AudioChunk, TranscriptSegment, SpeechSegment + +logger = logging.getLogger("advanced-backend") +audio_logger = logging.getLogger("audio_processing") +audio_cropper_logger = logging.getLogger("audio_cropper") + +# Configuration values that were previously in main.py +OMI_SAMPLE_RATE = 16_000 # Hz +OMI_CHANNELS = 1 +OMI_SAMPLE_WIDTH = 2 # bytes (16‑bit) +SEGMENT_SECONDS = 60 # length of each stored chunk +CHUNK_DIR = Path("./audio_chunks") +AUDIO_CROPPING_ENABLED = os.getenv("AUDIO_CROPPING_ENABLED", "true").lower() == "true" +MIN_SPEECH_SEGMENT_DURATION = float(os.getenv("MIN_SPEECH_SEGMENT_DURATION", "1.0")) +CROPPING_CONTEXT_PADDING = float(os.getenv("CROPPING_CONTEXT_PADDING", "0.1")) + +# Ensure CHUNK_DIR exists +CHUNK_DIR.mkdir(parents=True, exist_ok=True) + +def _new_local_file_sink(file_path): + """Create a properly configured LocalFileSink with all wave parameters set.""" + sink = LocalFileSink( + file_path=file_path, + sample_rate=int(OMI_SAMPLE_RATE), + channels=int(OMI_CHANNELS), + sample_width=int(OMI_SAMPLE_WIDTH), + ) + return sink + +async def _process_audio_cropping_with_relative_timestamps( + original_path: str, + speech_segments: List[Tuple[float, float]], + output_path: str, + audio_uuid: str, + chunk_repo_instance, # Pass chunk_repo instance +) -> bool: + """ + Process audio cropping with automatic relative timestamp conversion. + This function handles both live processing and reprocessing scenarios. + """ + try: + # Convert absolute timestamps to relative timestamps + # Extract file start time from filename: timestamp_client_uuid.wav + filename = original_path.split("/")[-1] + file_start_timestamp = float(filename.split("_")[0]) + + # Convert speech segments to relative timestamps + relative_segments = [] + for start_abs, end_abs in speech_segments: + start_rel = start_abs - file_start_timestamp + end_rel = end_abs - file_start_timestamp + + # Ensure relative timestamps are positive (sanity check) + if start_rel < 0: + audio_logger.warning( + f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0" + ) + start_rel = 0.0 + if end_rel < 0: + audio_logger.warning( + f"⚠️ Negative end timestamp: {end_rel}, skipping segment" + ) + continue + + relative_segments.append((start_rel, end_rel)) + + audio_logger.info( + f"πŸ• Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}" + ) + audio_logger.info(f"πŸ• Absolute segments: {speech_segments}") + audio_logger.info(f"πŸ• Relative segments: {relative_segments}") + + success = await _crop_audio_with_ffmpeg( + original_path, relative_segments, output_path + ) + if success: + # Update database with cropped file info (keep original absolute timestamps for reference) + cropped_filename = output_path.split("/")[-1] + await chunk_repo_instance.update_cropped_audio( # Use passed instance + audio_uuid, cropped_filename, speech_segments + ) + audio_logger.info( + f"Successfully processed cropped audio: {cropped_filename}" + ) + return True + else: + audio_logger.error(f"Failed to crop audio for {audio_uuid}") + return False + except Exception as e: + audio_logger.error(f"Error in audio cropping task for {audio_uuid}: {e}") + return False + +async def _crop_audio_with_ffmpeg( + original_path: str, speech_segments: List[Tuple[float, float]], output_path: str +) -> bool: + """Use ffmpeg to crop audio - runs as async subprocess, no GIL issues""" + audio_cropper_logger.info( + f"Cropping audio {original_path} with {len(speech_segments)} speech segments" + ) + + if not AUDIO_CROPPING_ENABLED: + audio_cropper_logger.info(f"Audio cropping disabled, skipping {original_path}") + return False + + if not speech_segments: + audio_cropper_logger.warning(f"No speech segments to crop for {original_path}") + return False + + # Filter out segments that are too short + filtered_segments = [] + for start, end in speech_segments: + duration = end - start + if duration >= MIN_SPEECH_SEGMENT_DURATION: + # Add padding around speech segments + padded_start = max(0, start - CROPPING_CONTEXT_PADDING) + padded_end = end + CROPPING_CONTEXT_PADDING + filtered_segments.append((padded_start, padded_end)) + else: + audio_cropper_logger.debug( + f"Skipping short segment: {start}-{end} ({duration:.2f}s < {MIN_SPEECH_SEGMENT_DURATION}s)" + ) + + if not filtered_segments: + audio_cropper_logger.warning( + f"No segments meet minimum duration ({MIN_SPEECH_SEGMENT_DURATION}s) for {original_path}" + ) + return False + + audio_cropper_logger.info( + f"Cropping audio {original_path} with {len(filtered_segments)} speech segments (filtered from {len(speech_segments)})" + ) + + try: + # Build ffmpeg filter for concatenating speech segments + filter_parts = [] + for i, (start, end) in enumerate(filtered_segments): + duration = end - start + filter_parts.append( + f"[0:a]atrim=start={start}:duration={duration},asetpts=PTS-STARTPTS[seg{i}]" + ) + + # Concatenate all segments + inputs = "".join(f"[seg{i}]" for i in range(len(filtered_segments))) + concat_filter = f"{inputs}concat=n={len(filtered_segments)}:v=0:a=1[out]" + + full_filter = ";".join(filter_parts + [concat_filter]) + + # Run ffmpeg as async subprocess + cmd = [ + "ffmpeg", + "-y", # -y = overwrite output + "-i", + original_path, + "-filter_complex", + full_filter, + "-map", + "[out]", + "-c:a", + "pcm_s16le", # Keep same format as original + output_path, + ] + + audio_cropper_logger.info(f"Running ffmpeg command: {' '.join(cmd)}") + + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await process.communicate() + if stdout: + audio_cropper_logger.debug(f"FFMPEG stdout: {stdout.decode()}") + + if process.returncode == 0: + # Calculate cropped duration + cropped_duration = sum(end - start for start, end in filtered_segments) + audio_cropper_logger.info( + f"Successfully cropped {original_path} -> {output_path} ({cropped_duration:.1f}s from {len(filtered_segments)} segments)" + ) + return True + else: + error_msg = stderr.decode() if stderr else "Unknown ffmpeg error" + audio_logger.error(f"ffmpeg failed for {original_path}: {error_msg}") + return False + + except Exception as e: + audio_logger.error(f"Error running ffmpeg on {original_path}: {e}") + return False + +class ChunkRepo: + """Async helpers for the audio_chunks collection.""" + + def __init__(self, collection): + self.col = collection + + async def create_chunk( + self, + *, + audio_uuid, + audio_path, + client_id, + timestamp, + transcript=None, + speakers_identified=None, + ): + doc = { + "audio_uuid": audio_uuid, + "audio_path": audio_path, + "client_id": client_id, + "timestamp": timestamp, + "transcript": transcript or [], # List of conversation segments + "speakers_identified": speakers_identified + or [], # List of identified speakers + } + await self.col.insert_one(doc) + + async def add_transcript_segment(self, audio_uuid, transcript_segment): + """Add a single transcript segment to the conversation.""" + await self.col.update_one( + {"audio_uuid": audio_uuid}, {"$push": {"transcript": transcript_segment}} + ) + + async def add_speaker(self, audio_uuid, speaker_id): + """Add a speaker to the speakers_identified list if not already present.""" + await self.col.update_one( + {"audio_uuid": audio_uuid}, + {"$addToSet": {"speakers_identified": speaker_id}}, + ) + + async def update_transcript(self, audio_uuid, full_transcript): + """Update the entire transcript list (for compatibility).""" + await self.col.update_one( + {"audio_uuid": audio_uuid}, {"$set": {"transcript": full_transcript}} + ) + + async def update_segment_timing( + self, audio_uuid, segment_index, start_time, end_time + ): + """Update timing information for a specific transcript segment.""" + await self.col.update_one( + {"audio_uuid": audio_uuid}, + { + "$set": { + f"transcript.{segment_index}.start": start_time, + f"transcript.{segment_index}.end": end_time, + } + }, + ) + + async def update_segment_speaker(self, audio_uuid, segment_index, speaker_id): + """Update the speaker for a specific transcript segment.""" + result = await self.col.update_one( + {"audio_uuid": audio_uuid}, + {"$set": {f"transcript.{segment_index}.speaker": speaker_id}}, + ) + if result.modified_count > 0: + audio_logger.info( + f"Updated segment {segment_index} speaker to {speaker_id} for {audio_uuid}" + ) + return result.modified_count > 0 + + async def update_cropped_audio( + self, + audio_uuid: str, + cropped_path: str, + speech_segments: List[Tuple[float, float]], + ): + """Update the chunk with cropped audio information.""" + cropped_duration = sum(end - start for start, end in speech_segments) + + result = await self.col.update_one( + {"audio_uuid": audio_uuid}, + { + "$set": { + "cropped_audio_path": cropped_path, + "speech_segments": [ + {"start": start, "end": end} for start, end in speech_segments + ], + "cropped_duration": cropped_duration, + "cropped_at": time.time(), + } + }, + ) + if result.modified_count > 0: + audio_logger.info( + f"Updated cropped audio info for {audio_uuid}: {cropped_path}" + ) + return result.modified_count > 0 + +class AudioChunkUtils: + def __init__(self, chunks_collection: AsyncIOMotorClient): + self.chunks_col = chunks_collection + self.chunk_repo = ChunkRepo(chunks_collection) + + async def get_conversations(self): + """Get all conversations grouped by client_id.""" + try: + # Get all audio chunks and group by client_id + cursor = self.chunks_col.find({}).sort("timestamp", -1) + conversations = {} + + async for chunk in cursor: + client_id = chunk.get("client_id", "unknown") + if client_id not in conversations: + conversations[client_id] = [] + + conversations[client_id].append( + { + "audio_uuid": chunk["audio_uuid"], + "audio_path": chunk["audio_path"], + "cropped_audio_path": chunk.get("cropped_audio_path"), + "timestamp": chunk["timestamp"], + "transcript": chunk.get("transcript", []), + "speakers_identified": chunk.get("speakers_identified", []), + "speech_segments": chunk.get("speech_segments", []), + "cropped_duration": chunk.get("cropped_duration"), + } + ) + + return JSONResponse(content={"conversations": conversations}) + except Exception as e: + audio_logger.error(f"Error getting conversations: {e}") + return JSONResponse(status_code=500, content={"error": str(e)}) + + async def get_cropped_audio_info(self, audio_uuid: str): + """Get cropped audio information for a specific conversation.""" + try: + chunk = await self.chunks_col.find_one({"audio_uuid": audio_uuid}) + if not chunk: + return JSONResponse( + status_code=404, content={"error": "Conversation not found"} + ) + + return JSONResponse(content={ + "audio_uuid": audio_uuid, + "original_audio_path": chunk["audio_path"], + "cropped_audio_path": chunk.get("cropped_audio_path"), + "speech_segments": chunk.get("speech_segments", []), + "cropped_duration": chunk.get("cropped_duration"), + "cropped_at": chunk.get("cropped_at"), + "has_cropped_version": bool(chunk.get("cropped_audio_path")), + }) + except Exception as e: + audio_logger.error(f"Error getting cropped audio info: {e}") + return JSONResponse(status_code=500, content={"error": str(e)}) + + async def reprocess_audio_cropping(self, audio_uuid: str): + """Trigger reprocessing of audio cropping for a specific conversation.""" + try: + chunk = await self.chunks_col.find_one({"audio_uuid": audio_uuid}) + if not chunk: + return JSONResponse( + status_code=404, content={"error": "Conversation not found"} + ) + + original_path = f"{CHUNK_DIR}/{chunk['audio_path']}" + if not Path(original_path).exists(): + return JSONResponse( + status_code=404, content={"error": "Original audio file not found"} + ) + + # Check if we have speech segments + speech_segments = chunk.get("speech_segments", []) + if not speech_segments: + return JSONResponse( + status_code=400, + content={"error": "No speech segments available for cropping"}, + ) + + # Convert speech segments from dict format to tuple format + speech_segments_tuples = [(seg["start"], seg["end"]) for seg in speech_segments] + + cropped_filename = chunk["audio_path"].replace(".wav", "_cropped.wav") + cropped_path = f"{CHUNK_DIR}/{cropped_filename}" + + # Process in background using shared logic + async def reprocess_task(): + audio_logger.info(f"πŸ”„ Starting reprocess for {audio_uuid}") + await _process_audio_cropping_with_relative_timestamps( + original_path, speech_segments_tuples, cropped_path, audio_uuid, self.chunk_repo + ) + + asyncio.create_task(reprocess_task()) + + return JSONResponse(content={"message": "Reprocessing started", "audio_uuid": audio_uuid}) + except Exception as e: + audio_logger.error(f"Error reprocessing audio: {e}") + return JSONResponse(status_code=500, content={"error": str(e)}) + + async def add_speaker_to_conversation(self, audio_uuid: str, speaker_id: str): + """Add a speaker to the speakers_identified list for a conversation.""" + try: + await self.chunk_repo.add_speaker(audio_uuid, speaker_id) + return JSONResponse( + content={ + "message": f"Speaker {speaker_id} added to conversation {audio_uuid}" + } + ) + except Exception as e: + audio_logger.error(f"Error adding speaker: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error adding speaker"} + ) + + async def update_transcript_segment( + self, + audio_uuid: str, + segment_index: int, + speaker_id: Optional[str] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + ): + """Update a specific transcript segment with speaker or timing information.""" + try: + update_doc = {} + + if speaker_id is not None: + update_doc[f"transcript.{segment_index}.speaker"] = speaker_id + # Also add to speakers_identified if not already present + await self.chunk_repo.add_speaker(audio_uuid, speaker_id) + + if start_time is not None: + update_doc[f"transcript.{segment_index}.start"] = start_time + + if end_time is not None: + update_doc[f"transcript.{segment_index}.end"] = end_time + + if not update_doc: + return JSONResponse( + status_code=400, content={"error": "No update parameters provided"} + ) + + result = await self.chunks_col.update_one( + {"audio_uuid": audio_uuid}, {"$set": update_doc} + ) + + if result.matched_count == 0: + return JSONResponse( + status_code=404, content={"error": "Conversation not found"} + ) + + return JSONResponse( + content={"message": "Transcript segment updated successfully"} + ) + + except Exception as e: + audio_logger.error(f"Error updating transcript segment: {e}") + return JSONResponse(status_code=500, content={"error": "Internal server error"}) \ No newline at end of file diff --git a/backends/advanced-backend/src/utils/logging.py b/backends/advanced-backend/src/utils/logging.py new file mode 100644 index 00000000..ce05272b --- /dev/null +++ b/backends/advanced-backend/src/utils/logging.py @@ -0,0 +1,9 @@ +import logging + +# Logging setup +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("advanced-backend") +audio_logger = logging.getLogger("audio_processing") +memory_logger = logging.getLogger("memory_service") +items_logger = logging.getLogger("action_items_service") +audio_cropper_logger = logging.getLogger("audio_cropper") \ No newline at end of file diff --git a/backends/advanced-backend/src/utils/transcribe.py b/backends/advanced-backend/src/utils/transcribe.py new file mode 100644 index 00000000..c44ee9df --- /dev/null +++ b/backends/advanced-backend/src/utils/transcribe.py @@ -0,0 +1,286 @@ +import asyncio +import logging +import os +import time +from functools import partial +from typing import Optional, Tuple + +from wyoming.asr import Transcribe, Transcript +from wyoming.audio import AudioChunk, AudioStart +from wyoming.client import AsyncTcpClient +from wyoming.vad import VoiceStarted, VoiceStopped + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, +) + +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +RESET = "\033[0m" + +# Conditional Deepgram import +try: + from deepgram import DeepgramClient, FileSource, PrerecordedOptions # type: ignore + + DEEPGRAM_AVAILABLE = True +except ImportError: + DEEPGRAM_AVAILABLE = False + +logger = logging.getLogger("advanced-backend") +audio_logger = logging.getLogger("audio_processing") + +# Configuration values (copied from main.py) +OFFLINE_ASR_TCP_URI = os.getenv("OFFLINE_ASR_TCP_URI", "tcp://192.168.0.110:8765/") +DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") + +USE_DEEPGRAM = bool(DEEPGRAM_API_KEY and DEEPGRAM_AVAILABLE) +if DEEPGRAM_API_KEY and not DEEPGRAM_AVAILABLE: + audio_logger.error( + "DEEPGRAM_API_KEY provided but Deepgram SDK not available. Falling back to offline ASR." + ) +audio_logger.info( + f"Transcription strategy: {'Deepgram' if USE_DEEPGRAM else 'Offline ASR'}" +) + +deepgram_client = None +if USE_DEEPGRAM: + audio_logger.warning( + "Deepgram transcription requested but not yet implemented. Falling back to offline ASR." + ) + USE_DEEPGRAM = False + +from conversation_manager import record_speech_start, record_speech_end + +class TranscriptionManager: + """Manages transcription using either Deepgram or offline ASR service.""" + + def __init__(self, action_item_callback=None, audio_chunk_utils=None, metrics_collector=None, active_clients=None): + self.client = None + self._current_audio_uuid = None + self._streaming = False + self.use_deepgram = USE_DEEPGRAM + self.deepgram_client = deepgram_client + self._audio_buffer = [] # Buffer for Deepgram batch processing + self.audio_chunk_utils = audio_chunk_utils + self.action_item_callback = action_item_callback # Callback to queue action items + self.metrics_collector = metrics_collector + self.active_clients = active_clients + + async def connect(self): + """Establish connection to ASR service (only for offline ASR).""" + + if self.use_deepgram: + try: + deepgram_options = DeepgramClientOptions(options={"keepalive": "true", "termination_exception_connect": "true"}) + self.client = DeepgramClient(os.getenv('DEEPGRAM_API_KEY'), deepgram_options) + connection = await self.client.listen.websocket.v() + except Exception as e: + audio_logger.error(f"Failed to connect to Deepgram: {e}") + self.client = None + raise + audio_logger.info("Using Deepgram transcription - no connection needed") + + + try: + self.client = AsyncTcpClient.from_uri(OFFLINE_ASR_TCP_URI) + await self.client.connect() + audio_logger.info( + f"Connected to offline ASR service at {OFFLINE_ASR_TCP_URI}" + ) + except Exception as e: + audio_logger.error(f"Failed to connect to offline ASR service: {e}") + self.client = None + raise + + async def disconnect(self): + """Cleanly disconnect from ASR service.""" + if self.use_deepgram: + audio_logger.info("Using Deepgram - no disconnection needed") + return + + if self.client: + try: + await self.client.disconnect() + audio_logger.info("Disconnected from offline ASR service") + except Exception as e: + audio_logger.error(f"Error disconnecting from offline ASR service: {e}") + finally: + self.client = None + + async def transcribe_chunk( + self, audio_uuid: str, chunk: AudioChunk, client_id: str + ): + """Transcribe a single chunk using either Deepgram or offline ASR.""" + if self.use_deepgram: + await self._transcribe_chunk_deepgram(audio_uuid, chunk, client_id) + else: + await self._transcribe_chunk_offline(audio_uuid, chunk, client_id) + + async def _transcribe_chunk_deepgram( + self, audio_uuid: str, chunk: AudioChunk, client_id: str + ): + """Transcribe using Deepgram API.""" + raise NotImplementedError( + "Deepgram transcription is not yet implemented. Please use offline ASR by not setting DEEPGRAM_API_KEY." + ) + + async def _process_deepgram_buffer(self, audio_uuid: str, client_id: str): + """Process buffered audio with Deepgram.""" + raise NotImplementedError("Deepgram transcription is not yet implemented.") + + async def _transcribe_chunk_offline( + self, audio_uuid: str, chunk: AudioChunk, client_id: str + ): + """Transcribe using offline ASR service.""" + if not self.client: + audio_logger.error(f"No ASR connection available for {audio_uuid}") + # Track transcription failure + self.metrics_collector.record_transcription_result(False) + return + + # Track transcription request + start_time = time.time() + self.metrics_collector.record_transcription_request() + + try: + if self._current_audio_uuid != audio_uuid: + self._current_audio_uuid = audio_uuid + audio_logger.info(f"New audio_uuid: {audio_uuid}") + transcribe = Transcribe() + await self.client.write_event(transcribe.event()) + audio_start = AudioStart( + rate=chunk.rate, + width=chunk.width, + channels=chunk.channels, + timestamp=chunk.timestamp, + ) + await self.client.write_event(audio_start.event()) + + # Send the audio chunk + await self.client.write_event(chunk.event()) + + # Read and process any available events (non-blocking) + try: + while True: + event = await asyncio.wait_for( + self.client.read_event(), timeout=0.001 + ) # this is a quick poll, feels like a better solution can exist + if event is None: + break + + if Transcript.is_type(event.type): + transcript_obj = Transcript.from_event(event) + transcript_text = transcript_obj.text.strip() + + # Handle both Transcript and StreamingTranscript types + # Check the 'final' attribute from the event data, not the reconstructed object + is_final = event.data.get( + "final", True + ) # Default to True for standard Transcript + + # Only process final transcripts, ignore partial ones + if not is_final: + audio_logger.info( + f"Ignoring partial transcript for {audio_uuid}: {transcript_text}" + ) + continue + + if transcript_text: + audio_logger.info( + f"Transcript for {audio_uuid}: {transcript_text} (final: {is_final})" + ) + + # Track successful transcription with latency + latency_ms = (time.time() - start_time) * 1000 + self.metrics_collector.record_transcription_result( + True, latency_ms + ) + + # Create transcript segment with new format + transcript_segment = { + "speaker": f"speaker_{client_id}", + "text": transcript_text, + "start": 0.0, + "end": 0.0, + } + + # Store transcript segment in DB immediately + + await self.audio_chunk_utils.chunk_repo.add_transcript_segment(audio_uuid, transcript_segment) + + # Queue for action item processing using callback (async, non-blocking) + if self.action_item_callback: + await self.action_item_callback(transcript_text, client_id, audio_uuid) + + await self.audio_chunk_utils.chunk_repo.add_speaker(audio_uuid, f"speaker_{client_id}") + audio_logger.info(f"Added transcript segment for {audio_uuid} to DB.") + + # Update transcript time for conversation timeout tracking + if client_id in self.active_clients: + self.active_clients[client_id].last_transcript_time = ( + time.time() + ) + # Collect transcript for end-of-conversation memory processing + self.active_clients[ + client_id + ].conversation_transcripts.append(transcript_text) + audio_logger.info( + f"Added transcript to conversation collection: {GREEN}'{transcript_text}'{RESET}" + ) + + elif VoiceStarted.is_type(event.type): + audio_logger.info( + f"VoiceStarted event received for {audio_uuid}" + ) + current_time = time.time() + if client_id in self.active_clients: + record_speech_start( + self.active_clients[client_id], audio_uuid, current_time + ) + audio_logger.info( + f"🎀 Voice started for {audio_uuid} at {current_time}" + ) + + elif VoiceStopped.is_type(event.type): + audio_logger.info( + f"VoiceStopped event received for {audio_uuid}" + ) + current_time = time.time() + if client_id in self.active_clients: + record_speech_end( + self.active_clients[client_id], audio_uuid, current_time + ) + audio_logger.info( + f"πŸ”‡ Voice stopped for {audio_uuid} at {current_time}" + ) + + except asyncio.TimeoutError: + # No events available right now, that's fine + pass + + except Exception as e: + audio_logger.error( + f"Error in offline transcribe_chunk for {audio_uuid}: {e}" + ) + # Track transcription failure + self.metrics_collector.record_transcription_result(False) + # Attempt to reconnect on error + await self._reconnect() + + async def _reconnect(self): + """Attempt to reconnect to ASR service.""" + audio_logger.info("Attempting to reconnect to ASR service...") + + # Track reconnection attempt + self.metrics_collector.record_service_reconnection("asr-service") + + await self.disconnect() + await asyncio.sleep(2) # Brief delay before reconnecting + try: + await self.connect() + except Exception as e: + audio_logger.error(f"Reconnection failed: {e}") \ No newline at end of file diff --git a/backends/advanced-backend/src/utils/user_utils.py b/backends/advanced-backend/src/utils/user_utils.py new file mode 100644 index 00000000..d8f75d78 --- /dev/null +++ b/backends/advanced-backend/src/utils/user_utils.py @@ -0,0 +1,117 @@ +import logging +from typing import Optional +from motor.motor_asyncio import AsyncIOMotorClient +from pymongo.results import DeleteResult, InsertOneResult +from fastapi.responses import JSONResponse + +logger = logging.getLogger("advanced-backend") + +class UserUtils: + def __init__(self, users_collection): + self.users_col = users_collection + + async def get_all_users(self): + """Retrieves all users from the database.""" + try: + cursor = self.users_col.find() + users = [] + for doc in await cursor.to_list(length=100): + doc["_id"] = str(doc["_id"]) # Convert ObjectId to string + users.append(doc) + return JSONResponse(content=users) + except Exception as e: + logger.error(f"Error fetching users: {e}", exc_info=True) + return JSONResponse( + status_code=500, content={"message": "Error fetching users"} + ) + + async def get_user_by_name(self, user_id: str): + """Retrieves a single user by their user_id.""" + try: + user = await self.users_col.find_one({"user_id": user_id}) + if user: + user["_id"] = str(user["_id"]) # Convert ObjectId to string + return JSONResponse(content=user) + else: + return JSONResponse(status_code=404, content={"message": f"User {user_id} not found"}) + except Exception as e: + logger.error(f"Error fetching user {user_id}: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"message": "Error fetching user"}) + + async def create_new_user(self, user_id: str): + """Creates a new user in the database.""" + try: + # Check if user already exists using get_user_by_name + existing_user_response = await self.get_user_by_name(user_id) + if existing_user_response.status_code == 200: + return JSONResponse( + status_code=409, content={"message": f"User {user_id} already exists"} + ) + + # Create new user + result: InsertOneResult = await self.users_col.insert_one({"user_id": user_id}) + return JSONResponse( + status_code=201, + content={ + "message": f"User {user_id} created successfully", + "id": str(result.inserted_id), + }, + ) + except Exception as e: + logger.error(f"Error creating user: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"message": "Error creating user"}) + + async def delete_user_data( + self, user_id: str, delete_conversations: bool = False, delete_memories: bool = False, chunks_col=None, memory_service=None + ): + """Deletes a user from the database with optional data cleanup.""" + try: + # Check if user exists using get_user_by_name + existing_user_response = await self.get_user_by_name(user_id) + if existing_user_response.status_code == 404: + return JSONResponse( + status_code=404, content={"message": f"User {user_id} not found"} + ) + + deleted_data = {} + + # Delete user from users collection + user_result: DeleteResult = await self.users_col.delete_one({"user_id": user_id}) + deleted_data["user_deleted"] = user_result.deleted_count > 0 + + if delete_conversations and chunks_col: + # Delete all conversations (audio chunks) for this user + conversations_result: DeleteResult = await chunks_col.delete_many({"client_id": user_id}) + deleted_data["conversations_deleted"] = conversations_result.deleted_count + + if delete_memories and memory_service: + # Delete all memories for this user using the memory service + try: + memory_count = memory_service.delete_all_user_memories(user_id) + deleted_data["memories_deleted"] = memory_count + except Exception as mem_error: + logger.error( + f"Error deleting memories for user {user_id}: {mem_error}" + ) + deleted_data["memories_deleted"] = 0 + deleted_data["memory_deletion_error"] = str(mem_error) + + # Build message based on what was deleted + message = f"User {user_id} deleted successfully" + deleted_items = [] + if delete_conversations and deleted_data.get("conversations_deleted", 0) > 0: + deleted_items.append( + f"{deleted_data['conversations_deleted']} conversations" + ) + if delete_memories and deleted_data.get("memories_deleted", 0) > 0: + deleted_items.append(f"{deleted_data['memories_deleted']} memories") + + if deleted_items: + message += f" along with {' and '.join(deleted_items)}" + + return JSONResponse( + status_code=200, content={"message": message, "deleted_data": deleted_data} + ) + except Exception as e: + logger.error(f"Error deleting user: {e}", exc_info=True) + return JSONResponse(status_code=500, content={"message": "Error deleting user"}) \ No newline at end of file diff --git a/backends/advanced-backend/webui/streamlit_app.py b/backends/advanced-backend/webui/streamlit_app.py index 907f548f..a20ad5de 100644 --- a/backends/advanced-backend/webui/streamlit_app.py +++ b/backends/advanced-backend/webui/streamlit_app.py @@ -32,7 +32,7 @@ # ---- Configuration ---- # BACKEND_API_URL = os.getenv("BACKEND_API_URL", "http://192.168.0.110:8000") # For browser-accessible URLs (audio files), use localhost instead of Docker service name -BACKEND_PUBLIC_URL = os.getenv("BACKEND_PUBLIC_URL", "http://localhost:8000") +BACKEND_PUBLIC_URL = os.getenv("BACKEND_PUBLIC_URL", "http://${NGROK_PUBLIC_URL}") logger.info(f"πŸ”§ Configuration loaded - Backend API: {BACKEND_API_URL}, Public URL: {BACKEND_PUBLIC_URL}") @@ -333,7 +333,7 @@ def delete_data(endpoint: str, params: dict | None = None): Backend Public: {BACKEND_PUBLIC_URL} Active Clients: {config.get('active_clients', 'Unknown')} MongoDB URI: {config.get('mongodb_uri', 'Unknown')[:30]}... -Ollama URL: {config.get('ollama_url', 'Unknown')} +LLM URL: {config.get('llm_url', 'Unknown')} Qdrant URL: {config.get('qdrant_url', 'Unknown')} ASR URI: {config.get('asr_uri', 'Unknown')} Chunk Directory: {config.get('chunk_dir', 'Unknown')} @@ -485,7 +485,10 @@ def delete_data(endpoint: str, params: dict | None = None): # Display audio with label and cache-busting st.write(audio_label) audio_url = f"{BACKEND_PUBLIC_URL}/audio/{selected_audio_path}{cache_buster}" - st.audio(audio_url, format="audio/wav") + audio_placeholder = st.empty() + # Add a small delay to ensure the file is fully ready for the browser + # time.sleep(0.1) # This might block the UI, so use with caution + audio_placeholder.audio(audio_url, format="audio/wav") logger.debug(f"🎡 Audio URL: {audio_url}") # Show additional info in debug mode or when both versions exist @@ -639,7 +642,7 @@ def delete_data(endpoint: str, params: dict | None = None): with col2: with st.spinner("Loading action items..."): logger.debug(f"πŸ“‘ Fetching action items for user: {user_id_input.strip()}") - action_items_response = get_data(f"/api/action-items?user_id={user_id_input.strip()}") + # action_items_response = get_data(f"/api/action-items?user_id={user_id_input.strip()}") # Handle the API response format with "results" wrapper for memories if memories_response and isinstance(memories_response, dict) and "results" in memories_response: @@ -649,13 +652,13 @@ def delete_data(endpoint: str, params: dict | None = None): memories = memories_response logger.debug(f"🧠 Memories response format: {type(memories_response)}") - # Handle action items response - if action_items_response and isinstance(action_items_response, dict) and "action_items" in action_items_response: - action_items = action_items_response["action_items"] - logger.debug(f"🎯 Action items response has 'action_items' wrapper, extracted {len(action_items)} items") - else: - action_items = action_items_response if action_items_response else [] - logger.debug(f"🎯 Action items response format: {type(action_items_response)}") + # # Handle action items response + # if action_items_response and isinstance(action_items_response, dict) and "action_items" in action_items_response: + # action_items = action_items_response["action_items"] + # logger.debug(f"🎯 Action items response has 'action_items' wrapper, extracted {len(action_items)} items") + # else: + # action_items = action_items_response if action_items_response else [] + # logger.debug(f"🎯 Action items response format: {type(action_items_response)}") else: # Show instruction to enter a username memories = None @@ -711,7 +714,7 @@ def delete_data(endpoint: str, params: dict | None = None): else: logger.info(f"🧠 No memories found for user {user_id_input.strip()}") st.info("No memories found for this user.") - + action_items = None # Initialize action_items to None for clarity # Display Action Items Section if action_items is not None: logger.debug("🎯 Displaying action items section...")