diff --git a/backends/advanced/src/advanced_omi_backend/config.py b/backends/advanced/src/advanced_omi_backend/config.py index 3d738d7a..f88569b2 100644 --- a/backends/advanced/src/advanced_omi_backend/config.py +++ b/backends/advanced/src/advanced_omi_backend/config.py @@ -37,6 +37,12 @@ "speech_inactivity_threshold": 60, # Speech gap threshold for closure (1 minute) } +# Default audio storage settings +DEFAULT_AUDIO_STORAGE_SETTINGS = { + "audio_base_path": "/app/data", # Main audio directory (where volume is mounted) + "audio_chunks_path": "/app/data/audio_chunks", # Full path to audio chunks subfolder +} + # Global cache for diarization settings _diarization_settings = None @@ -140,5 +146,18 @@ def get_conversation_stop_settings(): } +def get_audio_storage_settings(): + """Get audio storage settings from environment or defaults.""" + + # Get base path and derive chunks path + audio_base_path = os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"]) + audio_chunks_path = os.getenv("AUDIO_CHUNKS_PATH", f"{audio_base_path}/audio_chunks") + + return { + "audio_base_path": audio_base_path, + "audio_chunks_path": audio_chunks_path, + } + + # Initialize settings on module load _diarization_settings = load_diarization_settings_from_file() \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index e53eef88..b97a813e 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -96,14 +96,29 @@ async def get_conversations(user: User): # Import conversations collection and repository conversations_repo = ConversationsRepository(conversations_col) + # Debug: Check what's in the conversations collection + total_conversations = await conversations_col.count_documents({}) + logger.info(f"๐Ÿ“Š Total conversations in database: {total_conversations}") + + if total_conversations > 0: + # Show a sample conversation to debug user_id format + sample = await conversations_col.find_one({}) + if sample: + logger.info(f"๐Ÿ” Sample conversation user_id: '{sample.get('user_id')}' (type: {type(sample.get('user_id'))})") + logger.info(f"๐Ÿ” Looking for user_id: '{str(user.user_id)}' (type: {type(str(user.user_id))})") + # Build query based on user permissions if not user.is_superuser: # Regular users can only see their own conversations user_conversations = await conversations_repo.get_user_conversations(str(user.user_id)) + logger.info(f"๐Ÿ“Š Found {len(user_conversations)} conversations for user {user.user_id}") else: # Admins see all conversations cursor = conversations_col.find({}).sort("created_at", -1) - user_conversations = await cursor.to_list(length=None) + all_conversations = await cursor.to_list(length=None) + # Populate primary fields for admin conversations too + user_conversations = [conversations_repo._populate_primary_fields(conv) for conv in all_conversations] + logger.info(f"๐Ÿ“Š Admin found {len(user_conversations)} total conversations") # Group conversations by client_id for backwards compatibility conversations = {} @@ -117,6 +132,10 @@ async def get_conversations(user: User): audio_path = audio_chunk.get("audio_path") if audio_chunk else None cropped_audio_path = audio_chunk.get("cropped_audio_path") if audio_chunk else None + # Get version counts + transcript_versions = conversation.get("transcript_versions", []) + memory_versions = conversation.get("memory_versions", []) + # Convert conversation to API format conversations[client_id].append( { @@ -137,13 +156,27 @@ async def get_conversations(user: User): # Audio file paths for playback "audio_path": audio_path, "cropped_audio_path": cropped_audio_path, + # Debug: Add full URLs for troubleshooting + "debug_audio_url": f"/audio/{audio_path}" if audio_path else None, + # Version information for UI + "version_info": { + "transcript_count": len(transcript_versions), + "memory_count": len(memory_versions), + "active_transcript_version": conversation.get("active_transcript_version"), + "active_memory_version": conversation.get("active_memory_version"), + } } ) + # Log final result + total_grouped = sum(len(convs) for convs in conversations.values()) + logger.info(f"โœ… Returning {len(conversations)} client groups with {total_grouped} total conversations") + logger.info(f"๐Ÿ“Š Client groups: {list(conversations.keys())}") + return {"conversations": conversations} except Exception as e: - logger.error(f"Error fetching conversations: {e}") + logger.error(f"โŒ Error fetching conversations: {e}", exc_info=True) return JSONResponse(status_code=500, content={"error": "Error fetching conversations"}) @@ -168,6 +201,9 @@ async def get_conversation_by_id(conversation_id: str, user: User): content={"error": "Access forbidden. You can only access your own conversations."} ) + # Populate primary fields from active versions + conversation = conversations_repo._populate_primary_fields(conversation) + # Get audio file paths from audio_chunks collection audio_chunk = await chunk_repo.get_chunk_by_audio_uuid(conversation["audio_uuid"]) audio_path = audio_chunk.get("audio_path") if audio_chunk else None @@ -472,10 +508,11 @@ async def delete_conversation(audio_uuid: str, user: User): # If this audio chunk has an associated conversation, delete it from conversations collection too if conversation_id: try: + # Delete the conversation with all its versions (transcript_versions and memory_versions) conversation_result = await conversations_col.delete_one({"conversation_id": conversation_id}) if conversation_result.deleted_count > 0: conversation_deleted = True - logger.info(f"Deleted conversation {conversation_id} associated with audio chunk {audio_uuid}") + logger.info(f"Deleted conversation {conversation_id} with all versions associated with audio chunk {audio_uuid}") else: logger.warning(f"Conversation {conversation_id} not found in conversations collection, but audio chunk was deleted") except Exception as e: @@ -534,6 +571,304 @@ async def delete_conversation(audio_uuid: str, user: User): ) +async def delete_conversation_version(conversation_id: str, version_type: str, version_id: str, user: User): + """Delete a specific version (transcript or memory) from a conversation. Users can only modify their own conversations.""" + try: + conversations_repo = ConversationsRepository(conversations_col) + + # Get the conversation first to check ownership + conversation = await conversations_repo.get_conversation(conversation_id) + if not conversation: + return JSONResponse( + status_code=404, + content={"error": "Conversation not found"} + ) + + # Check if user owns this conversation + if not user.is_superuser and conversation["user_id"] != str(user.user_id): + return JSONResponse( + status_code=403, + content={"error": "Access forbidden. You can only modify your own conversations."} + ) + + # Validate version type + if version_type not in ["transcript", "memory"]: + return JSONResponse( + status_code=400, + content={"error": "Version type must be 'transcript' or 'memory'"} + ) + + # Determine field names based on version type + if version_type == "transcript": + versions_field = "transcript_versions" + active_field = "active_transcript_version" + else: # memory + versions_field = "memory_versions" + active_field = "active_memory_version" + + # Check if this version exists + versions = conversation.get(versions_field, []) + version_exists = any(v.get("version_id") == version_id for v in versions) + + if not version_exists: + return JSONResponse( + status_code=404, + content={"error": f"{version_type.title()} version {version_id} not found"} + ) + + # Check if there are other versions (can't delete the last one) + if len(versions) <= 1: + return JSONResponse( + status_code=400, + content={"error": f"Cannot delete the last {version_type} version. Conversation must have at least one version."} + ) + + # Check if this is the active version + active_version = conversation.get(active_field) + is_active = (active_version == version_id) + + # If deleting active version, we need to set a new active version + new_active_version = None + if is_active: + # Find the most recent non-deleted version to make active + remaining_versions = [v for v in versions if v.get("version_id") != version_id] + if remaining_versions: + # Sort by created_at and pick the most recent + remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True) + new_active_version = remaining_versions[0]["version_id"] + + # Remove the version from the array + update_operations = { + "$pull": {versions_field: {"version_id": version_id}} + } + + # If we need to update the active version + if new_active_version: + update_operations["$set"] = {active_field: new_active_version} + + # Execute the update + result = await conversations_col.update_one( + {"conversation_id": conversation_id}, + update_operations + ) + + if result.modified_count == 0: + return JSONResponse( + status_code=500, + content={"error": f"Failed to delete {version_type} version"} + ) + + # If we updated the active version, also update legacy fields + if new_active_version: + # Get the updated conversation and populate primary fields + updated_conversation = await conversations_repo.get_conversation(conversation_id) + updated_conversation = conversations_repo._populate_primary_fields(updated_conversation) + + # Update legacy fields in database + legacy_updates = {} + if version_type == "transcript": + legacy_updates["transcript"] = updated_conversation.get("transcript", []) + legacy_updates["speakers_identified"] = updated_conversation.get("speakers_identified", []) + else: # memory + legacy_updates["memories"] = updated_conversation.get("memories", []) + legacy_updates["memory_processing_status"] = updated_conversation.get("memory_processing_status", "pending") + + if legacy_updates: + await conversations_col.update_one( + {"conversation_id": conversation_id}, + {"$set": legacy_updates} + ) + + logger.info(f"Deleted {version_type} version {version_id} from conversation {conversation_id}") + + response_data = { + "message": f"Successfully deleted {version_type} version {version_id}", + "conversation_id": conversation_id, + "version_type": version_type, + "deleted_version_id": version_id, + "was_active": is_active + } + + if new_active_version: + response_data["new_active_version"] = new_active_version + + return JSONResponse(status_code=200, content=response_data) + + except Exception as e: + logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}") + return JSONResponse( + status_code=500, + content={"error": f"Failed to delete {version_type} version: {str(e)}"} + ) + + +async def _do_transcript_reprocessing( + conversation_id: str, + audio_uuid: str, + audio_path: str, + version_id: str, + user_id: str +) -> dict: + """ + Core transcript reprocessing logic that can be called directly or from jobs. + + Args: + conversation_id: ID of conversation to reprocess + audio_uuid: UUID of audio session + audio_path: Full path to audio file + version_id: Version ID for the new transcript + user_id: ID of user requesting reprocessing + + Returns: + dict: Processing results with transcript and segment data + """ + from advanced_omi_backend.transcription import TranscriptionManager + from advanced_omi_backend.database import AudioChunksRepository + from advanced_omi_backend.processors import get_processor_manager + import wave + import time + + start_time = time.time() + logger.info(f"๐ŸŽค Starting core transcript reprocessing for conversation {conversation_id}") + + # Initialize transcription manager with existing proven pipeline + from advanced_omi_backend.database import chunks_col + chunk_repo = AudioChunksRepository(chunks_col) + processor_manager = get_processor_manager() + transcription_manager = TranscriptionManager( + chunk_repo=chunk_repo, + processor_manager=processor_manager + ) + + # Check if transcription provider is available + if not transcription_manager.provider: + raise Exception("No transcription provider configured") + + logger.info(f"๐ŸŽค Using transcription pipeline with provider: {transcription_manager.provider.name}") + + # Load and process the audio file using existing pipeline + with wave.open(audio_path, 'rb') as wav_file: + sample_rate = wav_file.getframerate() + audio_frames = wav_file.readframes(wav_file.getnframes()) + + logger.info(f"๐ŸŽค Audio loaded: {len(audio_frames)} bytes, {sample_rate}Hz") + + # Process audio directly without using conversation creation pipeline + + # Process transcription directly without creating new conversations + logger.info(f"๐ŸŽค Processing transcript directly for reprocessing...") + + # Use the transcription provider directly to avoid creating new conversations + transcript_result = await transcription_manager.provider.transcribe( + audio_frames, sample_rate, diarize=True + ) + + if not transcript_result: + raise Exception("Transcription failed - no result returned") + + # Extract transcript and segments from the provider result + transcript_text = transcript_result.get("text", "") + segments = transcript_result.get("segments", []) + + # If segments are empty but we have text, create a single segment + if not segments and transcript_text: + segments = [{ + "text": transcript_text, + "start": 0.0, + "end": len(audio_frames) / (sample_rate * 2), # Estimate duration + "speaker": "Speaker 0", + "confidence": transcript_result.get("confidence", 0.9) + }] + + logger.info(f"๐ŸŽค Transcript reprocessing completed:") + logger.info(f" - Text length: {len(transcript_text)} characters") + logger.info(f" - Segments: {len(segments)}") + logger.info(f" - Processing time: {time.time() - start_time:.2f} seconds") + + # Add speaker identification step if segments exist + final_segments = segments + if segments and len(segments) > 0: + try: + logger.info(f"๐ŸŽค Starting speaker identification for {len(segments)} segments...") + + # Initialize speaker client + from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient + speaker_client = SpeakerRecognitionClient() + + if speaker_client.enabled: + # Create transcript data for speaker identification + words = transcript_result.get("words", []) + transcript_data = { + "text": transcript_text, + "words": words + } + + # Call speaker identification with the audio file and transcript data + speaker_result = await speaker_client.diarize_identify_match( + audio_path, transcript_data, user_id=user_id + ) + + if speaker_result and speaker_result.get("segments"): + raw_segments = speaker_result["segments"] + logger.info(f"๐ŸŽค Speaker identification completed: {len(raw_segments)} raw segments") + + # Filter out segments with empty or minimal text + final_segments = [] + for seg in raw_segments: + text = seg.get("text", "").strip() + # Only keep segments with meaningful text (at least 2 characters) + if len(text) >= 2: + final_segments.append(seg) + else: + logger.debug(f"๐Ÿ—‘๏ธ Filtering out empty/minimal segment: '{text}' (speaker: {seg.get('speaker', 'UNKNOWN')})") + + logger.info(f"๐ŸŽค After filtering: {len(final_segments)} segments with meaningful text (filtered out {len(raw_segments) - len(final_segments)} empty segments)") + + # Debug: Log first few segments + for i, seg in enumerate(final_segments[:3]): + logger.info(f"๐Ÿ” Segment {i}: speaker='{seg.get('speaker', 'UNKNOWN')}', text='{seg.get('text', '')[:50]}...'") + else: + logger.warning("๐ŸŽค Speaker identification returned no segments, using original segments") + else: + logger.info("๐ŸŽค Speaker recognition disabled, using segments without speaker identification") + + except Exception as e: + logger.warning(f"Speaker identification failed during reprocessing: {e}") + # Continue with original segments if speaker identification fails + + # Update the conversation with the new transcript version (using final_segments with speaker identification) + conversations_repo = ConversationsRepository(conversations_col) + update_result = await conversations_repo.update_transcript_version( + conversation_id=conversation_id, + version_id=version_id, + transcript=transcript_text, + segments=final_segments, + processing_time_seconds=time.time() - start_time, + provider=transcription_manager.provider.name + ) + + if update_result: + logger.info(f"โœ… Updated transcript version {version_id} in database") + + # Activate the new transcript version + activation_result = await conversations_repo.activate_transcript_version(conversation_id, version_id) + if activation_result: + logger.info(f"โœ… Activated transcript version {version_id}") + else: + logger.warning(f"โš ๏ธ Failed to activate transcript version {version_id}") + + return { + "success": True, + "transcript": transcript_text, + "segments": final_segments, + "version_id": version_id, + "processing_time_seconds": time.time() - start_time, + "provider": transcription_manager.provider.name + } + else: + raise Exception("Failed to update transcript version in database") + + async def reprocess_transcript(conversation_id: str, user: User): """Reprocess transcript for a conversation. Users can only reprocess their own conversations.""" try: @@ -613,19 +948,75 @@ async def reprocess_transcript(conversation_id: str, user: User): status_code=500, content={"error": "Failed to create transcript version"} ) - # TODO: Queue audio for reprocessing with ProcessorManager - # This is where we would integrate with the existing processor - # For now, we'll return the version ID for the caller to handle - - logger.info(f"Created transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") - - return JSONResponse(content={ - "message": f"Transcript reprocessing started for conversation {conversation_id}", - "run_id": run_id, - "version_id": version_id, - "config_hash": config_hash, - "status": "PENDING" - }) + # Get audio file metadata for logging + try: + import wave + + audio_stats = { + "file_exists": full_audio_path.exists(), + "file_size_bytes": full_audio_path.stat().st_size if full_audio_path.exists() else 0, + "duration_seconds": 0, + "sample_rate": 0, + "channels": 0 + } + + if full_audio_path.exists() and str(full_audio_path).endswith('.wav'): + try: + with wave.open(str(full_audio_path), 'rb') as wav_file: + frames = wav_file.getnframes() + sample_rate = wav_file.getframerate() + audio_stats["duration_seconds"] = frames / sample_rate if sample_rate > 0 else 0 + audio_stats["sample_rate"] = sample_rate + audio_stats["channels"] = wav_file.getnchannels() + except Exception as wav_error: + logger.warning(f"Failed to read WAV metadata: {wav_error}") + + logger.info(f"๐ŸŽต Audio file metadata for {conversation_id}: {audio_stats}") + + except Exception as metadata_error: + logger.warning(f"Failed to get audio metadata: {metadata_error}") + audio_stats = {"error": str(metadata_error)} + + # Queue the reprocessing job + try: + from advanced_omi_backend.simple_queue import get_simple_queue + + queue = await get_simple_queue() + job_id = await queue.enqueue_job( + job_type="reprocess_transcript", + user_id=str(user.user_id), + data={ + "conversation_id": conversation_id, + "audio_uuid": audio_uuid, + "audio_path": str(full_audio_path), + "run_id": run_id, + "version_id": version_id, + "audio_metadata": audio_stats + } + ) + + logger.info(f"๐Ÿ“‹ Queued transcript reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})") + logger.info(f"๐Ÿ“‹ Job data: audio_path={full_audio_path}, duration={audio_stats.get('duration_seconds', 0)}s") + + return JSONResponse(content={ + "message": f"Transcript reprocessing queued for conversation {conversation_id}", + "job_id": job_id, + "run_id": run_id, + "version_id": version_id, + "config_hash": config_hash, + "status": "QUEUED", + "audio_metadata": audio_stats + }) + + except Exception as queue_error: + logger.error(f"Failed to queue transcript reprocessing job: {queue_error}") + return JSONResponse( + status_code=500, + content={ + "error": "Failed to queue transcript reprocessing job", + "details": str(queue_error) + } + ) except Exception as e: logger.error(f"Error starting transcript reprocessing: {e}") @@ -702,19 +1093,44 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use status_code=500, content={"error": "Failed to create memory version"} ) - # TODO: Queue memory extraction for processing - # This is where we would integrate with the existing memory processor - - logger.info(f"Created memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") - - return JSONResponse(content={ - "message": f"Memory reprocessing started for conversation {conversation_id}", - "run_id": run_id, - "version_id": version_id, - "transcript_version_id": transcript_version_id, - "config_hash": config_hash, - "status": "PENDING" - }) + # Queue the memory processing job + try: + from advanced_omi_backend.simple_queue import get_simple_queue + + queue = await get_simple_queue() + job_id = await queue.enqueue_job( + job_type="reprocess_memory", + user_id=str(user.user_id), + data={ + "conversation_id": conversation_id, + "audio_uuid": audio_uuid, + "transcript_version_id": transcript_version_id, + "run_id": run_id, + "version_id": version_id + } + ) + + logger.info(f"Queued memory reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})") + + return JSONResponse(content={ + "message": f"Memory reprocessing queued for conversation {conversation_id}", + "job_id": job_id, + "run_id": run_id, + "version_id": version_id, + "transcript_version_id": transcript_version_id, + "config_hash": config_hash, + "status": "QUEUED" + }) + + except Exception as queue_error: + logger.error(f"Failed to queue memory reprocessing job: {queue_error}") + return JSONResponse( + status_code=500, + content={ + "error": "Failed to queue memory reprocessing job", + "details": str(queue_error) + } + ) except Exception as e: logger.error(f"Error starting memory reprocessing: {e}") diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index e93c1d5c..24720b48 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -706,6 +706,50 @@ async def activate_transcript_version(self, conversation_id: str, version_id: st logger.info(f"Activated transcript version {version_id} for conversation {conversation_id}") return result.modified_count > 0 + async def update_transcript_version( + self, + conversation_id: str, + version_id: str, + transcript: str = None, + segments: list = None, + processing_time_seconds: float = None, + provider: str = None, + model: str = None + ) -> bool: + """Update a specific transcript version with processing results.""" + update_fields = {} + + if transcript is not None: + update_fields["transcript_versions.$.transcript"] = transcript + if segments is not None: + update_fields["transcript_versions.$.segments"] = segments + if processing_time_seconds is not None: + update_fields["transcript_versions.$.processing_time_seconds"] = processing_time_seconds + if provider is not None: + update_fields["transcript_versions.$.provider"] = provider + if model is not None: + update_fields["transcript_versions.$.model"] = model + + # Always update the completion timestamp + update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat() + update_fields["transcript_versions.$.status"] = "completed" + + if not update_fields: + return False + + result = await self.col.update_one( + { + "conversation_id": conversation_id, + "transcript_versions.version_id": version_id + }, + {"$set": update_fields} + ) + + if result.modified_count > 0: + logger.info(f"Updated transcript version {version_id} for conversation {conversation_id}") + return True + return False + async def activate_memory_version(self, conversation_id: str, version_id: str) -> bool: """Activate a specific memory version in conversation.""" # First verify the version exists diff --git a/backends/advanced/src/advanced_omi_backend/main.py b/backends/advanced/src/advanced_omi_backend/main.py index 1eaafabe..8ca51a31 100644 --- a/backends/advanced/src/advanced_omi_backend/main.py +++ b/backends/advanced/src/advanced_omi_backend/main.py @@ -48,6 +48,9 @@ get_processor_manager, init_processor_manager, ) + +from advanced_omi_backend.simple_queue import get_simple_queue + from advanced_omi_backend.audio_utils import process_audio_chunk from advanced_omi_backend.task_manager import init_task_manager, get_task_manager from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator @@ -320,6 +323,25 @@ async def lifespan(app: FastAPI): processor_manager = init_processor_manager(CHUNK_DIR, ac_repository) await processor_manager.start() + application_logger.info("Application-level processors started") + + # Initialize simple queue system + try: + queue = await get_simple_queue() + application_logger.info("Simple queue system started") + except Exception as e: + application_logger.error(f"Failed to start simple queue: {e}") + # Don't raise as queue system is not critical for basic operation + + # Skip memory service pre-initialization to avoid blocking FastAPI startup + # Memory service will be lazily initialized when first used + application_logger.info("Memory service will be initialized on first use (lazy loading)") + + # SystemTracker is used for monitoring and debugging + application_logger.info("Using SystemTracker for monitoring and debugging") + + application_logger.info("Application ready - using application-level processing architecture.") + logger.info("App ready") try: yield @@ -331,6 +353,14 @@ async def lifespan(app: FastAPI): for client_id in client_manager.get_all_client_ids(): await cleanup_client_state(client_id) + # Shutdown simple queue system + try: + if queue: + await queue.stop_worker() + application_logger.info("Simple queue system shut down") + except Exception as e: + application_logger.error(f"Error shutting down simple queue: {e}") + # Shutdown processor manager processor_manager = get_processor_manager() await processor_manager.shutdown() diff --git a/backends/advanced/src/advanced_omi_backend/routers/api_router.py b/backends/advanced/src/advanced_omi_backend/routers/api_router.py index 4a6ab878..ed3148cd 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/api_router.py +++ b/backends/advanced/src/advanced_omi_backend/routers/api_router.py @@ -14,6 +14,7 @@ client_router, conversation_router, memory_router, + queue_router, system_router, user_router, ) @@ -31,6 +32,7 @@ router.include_router(conversation_router) router.include_router(memory_router) router.include_router(system_router) +router.include_router(queue_router) logger.info("API router initialized with all sub-modules") diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py index 54fcf543..312fb877 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py @@ -8,6 +8,7 @@ - conversation_routes: Conversation CRUD and audio processing - memory_routes: Memory management, search, and debug - system_routes: System utilities, metrics, and file processing +- queue_routes: Job queue management and monitoring """ from .chat_routes import router as chat_router @@ -16,5 +17,6 @@ from .memory_routes import router as memory_router from .system_routes import router as system_router from .user_routes import router as user_router +from .queue_routes import router as queue_router -__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router"] +__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router", "queue_router"] diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py new file mode 100644 index 00000000..83f183cd --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py @@ -0,0 +1,147 @@ +""" +Simple queue API routes for job monitoring. +Provides basic endpoints for viewing job status and statistics. +""" + +import logging +from fastapi import APIRouter, Depends, Query, HTTPException +from pydantic import BaseModel +from typing import List, Optional + +from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.simple_queue import get_simple_queue +from advanced_omi_backend.users import User + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/queue", tags=["queue"]) + + +@router.get("/jobs") +async def list_jobs( + limit: int = Query(20, ge=1, le=100, description="Number of jobs to return"), + offset: int = Query(0, ge=0, description="Number of jobs to skip"), + status: str = Query(None, description="Filter by job status"), + job_type: str = Query(None, description="Filter by job type"), + priority: str = Query(None, description="Filter by job priority"), + current_user: User = Depends(current_active_user) +): + """List jobs with pagination and filtering.""" + try: + # Build filters dict + filters = {} + if status: + filters["status"] = status + if job_type: + filters["job_type"] = job_type + if priority: + filters["priority"] = priority + + queue = await get_simple_queue() + result = await queue.get_jobs(limit=limit, offset=offset, filters=filters) + + # Filter jobs by user if not admin + if not current_user.is_superuser: + result["jobs"] = [ + job for job in result["jobs"] + if job["user_id"] == str(current_user.user_id) + ] + result["pagination"]["total"] = len(result["jobs"]) + + return result + + except Exception as e: + logger.error(f"Failed to list jobs: {e}") + return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}} + + +@router.get("/stats") +async def get_queue_stats( + current_user: User = Depends(current_active_user) +): + """Get queue statistics.""" + try: + queue = await get_simple_queue() + stats = await queue.get_job_stats() + return stats + + except Exception as e: + logger.error(f"Failed to get queue stats: {e}") + return {"queued": 0, "processing": 0, "completed": 0, "failed": 0} + + +@router.get("/health") +async def get_queue_health(): + """Get queue system health status.""" + try: + queue = await get_simple_queue() + + return { + "status": "healthy" if queue.running else "stopped", + "worker_running": queue.running, + "message": "Simple queue is operational" if queue.running else "Simple queue worker not running" + } + + except Exception as e: + logger.error(f"Failed to get queue health: {e}") + return { + "status": "unhealthy", + "message": f"Health check failed: {str(e)}" + } + + +class FlushJobsRequest(BaseModel): + older_than_hours: int = 24 + statuses: Optional[List[str]] = None + + +class FlushAllJobsRequest(BaseModel): + confirm: bool = False + + +@router.post("/flush") +async def flush_inactive_jobs( + request: FlushJobsRequest, + current_user: User = Depends(current_active_user) +): + """Flush inactive jobs from the database (admin only).""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + try: + queue = await get_simple_queue() + result = await queue.flush_inactive_jobs( + older_than_hours=request.older_than_hours, + statuses=request.statuses + ) + return result + + except Exception as e: + logger.error(f"Failed to flush inactive jobs: {e}") + raise HTTPException(status_code=500, detail=f"Failed to flush jobs: {str(e)}") + + +@router.post("/flush-all") +async def flush_all_jobs( + request: FlushAllJobsRequest, + current_user: User = Depends(current_active_user) +): + """Flush ALL jobs from the database (admin only). USE WITH EXTREME CAUTION!""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + try: + if not request.confirm: + raise HTTPException( + status_code=400, + detail="Must set confirm=true to flush all jobs. This is a destructive operation." + ) + + queue = await get_simple_queue() + result = await queue.flush_all_jobs(confirm=request.confirm) + return result + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Failed to flush all jobs: {e}") + raise HTTPException(status_code=500, detail=f"Failed to flush all jobs: {str(e)}") \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/simple_queue.py b/backends/advanced/src/advanced_omi_backend/simple_queue.py new file mode 100644 index 00000000..752ddacb --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/simple_queue.py @@ -0,0 +1,368 @@ +""" +Simple MongoDB-based queue system. +Lightweight replacement for complex queue_manager.py that just calls existing controller methods. +""" + +import asyncio +import logging +import time +import uuid +from datetime import datetime, timezone, timedelta +from enum import Enum +from typing import Dict, Any, Optional + +from advanced_omi_backend.database import mongo_client + +logger = logging.getLogger(__name__) + +class JobStatus(str, Enum): + QUEUED = "queued" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + +class SimpleQueue: + """Simple MongoDB queue that calls controller methods directly.""" + + def __init__(self): + self.db = mongo_client.get_database("friend-lite") + self.jobs_collection = self.db["simple_jobs"] + self.running = False + self.worker_task = None + + async def enqueue_job(self, job_type: str, user_id: str, data: Dict[str, Any], priority: str = "normal") -> str: + """Add a job to the queue.""" + # Get next job ID using auto-increment + next_id = await self._get_next_job_id() + job_id = str(next_id) + job = { + "job_id": job_id, + "job_type": job_type, + "user_id": user_id, + "data": data, + "status": JobStatus.QUEUED, + "priority": priority, + "created_at": datetime.now(timezone.utc), + "attempts": 0, + "max_attempts": 3 + } + + await self.jobs_collection.insert_one(job) + logger.info(f"๐Ÿ“ฅ Enqueued {job_type} job {job_id}") + return job_id + + async def start_worker(self): + """Start the background worker.""" + if self.running: + return + + self.running = True + self.worker_task = asyncio.create_task(self._worker_loop()) + logger.info("๐Ÿ”„ Simple queue worker started") + + async def stop_worker(self): + """Stop the background worker.""" + self.running = False + if self.worker_task: + self.worker_task.cancel() + try: + await self.worker_task + except asyncio.CancelledError: + pass + logger.info("โน๏ธ Simple queue worker stopped") + + async def _worker_loop(self): + """Main worker loop that processes jobs.""" + while self.running: + try: + # Get next job + job = await self.jobs_collection.find_one_and_update( + {"status": JobStatus.QUEUED}, + {"$set": {"status": JobStatus.PROCESSING, "started_at": datetime.now(timezone.utc)}}, + sort=[("created_at", 1)] + ) + + if job: + await self._process_job(job) + else: + # No jobs, sleep for a bit + await asyncio.sleep(1) + + except Exception as e: + logger.error(f"Worker loop error: {e}") + await asyncio.sleep(5) + + async def _process_job(self, job: Dict[str, Any]): + """Process a single job by calling the appropriate controller method.""" + job_id = job["job_id"] + job_type = job["job_type"] + + try: + logger.info(f"๐Ÿ”„ Processing {job_type} job {job_id}") + + result = None + if job_type == "reprocess_transcript": + result = await self._handle_reprocess_transcript(job) + elif job_type == "reprocess_memory": + result = await self._handle_reprocess_memory(job) + else: + raise Exception(f"Unknown job type: {job_type}") + + # Mark as completed with result + await self.jobs_collection.update_one( + {"job_id": job_id}, + {"$set": { + "status": JobStatus.COMPLETED, + "completed_at": datetime.now(timezone.utc), + "result": result if result else {} + }} + ) + logger.info(f"โœ… Completed {job_type} job {job_id}") + + except Exception as e: + logger.error(f"โŒ Job {job_id} failed: {e}") + + # Increment attempts + attempts = job.get("attempts", 0) + 1 + max_attempts = job.get("max_attempts", 3) + + if attempts >= max_attempts: + # Mark as failed + await self.jobs_collection.update_one( + {"job_id": job_id}, + {"$set": { + "status": JobStatus.FAILED, + "error": str(e), + "attempts": attempts, + "failed_at": datetime.now(timezone.utc) + }} + ) + else: + # Retry + await self.jobs_collection.update_one( + {"job_id": job_id}, + {"$set": { + "status": JobStatus.QUEUED, + "attempts": attempts, + "last_error": str(e) + }} + ) + + async def _handle_reprocess_transcript(self, job: Dict[str, Any]): + """Handle transcript reprocessing by calling the controller method.""" + from advanced_omi_backend.controllers.conversation_controller import _do_transcript_reprocessing + + data = job["data"] + result = await _do_transcript_reprocessing( + conversation_id=data["conversation_id"], + audio_uuid=data["audio_uuid"], + audio_path=data["audio_path"], + version_id=data["version_id"], + user_id=job["user_id"] + ) + + # Format result for queue job storage + if result and result.get("success"): + # Count identified speakers from segments + segments = result.get("segments", []) + identified_speakers = set() + for segment in segments: + speaker = segment.get("speaker") + if speaker and not speaker.startswith("unknown_speaker") and speaker != "Unknown": + identified_speakers.add(speaker) + + return { + "conversation_id": data["conversation_id"], + "version_id": data["version_id"], + "transcript_segments": len(segments), + "speakers_identified": len(identified_speakers), + "identified_speaker_names": list(identified_speakers), + "processing_time": result.get("processing_time_seconds", 0), + "provider": result.get("provider", "unknown"), + "transcript_length": len(result.get("transcript", "")), + "success": True + } + else: + return {"success": False, "error": "No result returned"} + + async def _handle_reprocess_memory(self, job: Dict[str, Any]): + """Handle memory reprocessing by calling the controller method.""" + # TODO: Implement when needed + pass + + async def get_job_stats(self) -> Dict[str, int]: + """Get simple job statistics.""" + pipeline = [ + {"$group": {"_id": "$status", "count": {"$sum": 1}}} + ] + + status_counts = {"queued": 0, "processing": 0, "completed": 0, "failed": 0} + async for stat in self.jobs_collection.aggregate(pipeline): + status_counts[stat["_id"]] = stat["count"] + + # Format to match frontend expectations + total = sum(status_counts.values()) + return { + "total_jobs": total, + "queued_jobs": status_counts["queued"], + "processing_jobs": status_counts["processing"], + "completed_jobs": status_counts["completed"], + "failed_jobs": status_counts["failed"], + "cancelled_jobs": 0, # Not implemented yet + "retrying_jobs": 0 # Not implemented yet + } + + async def get_jobs(self, limit: int = 20, offset: int = 0, filters: Dict[str, str] = None) -> Dict[str, Any]: + """Get jobs with pagination and filtering.""" + # Build filter query + query = {} + if filters: + if filters.get("status"): + query["status"] = filters["status"] + if filters.get("job_type"): + query["job_type"] = filters["job_type"] + if filters.get("priority"): + query["priority"] = filters["priority"] + + total = await self.jobs_collection.count_documents(query) + + cursor = self.jobs_collection.find(query).sort("created_at", -1).skip(offset).limit(limit) + jobs = [] + async for job in cursor: + # Convert ObjectId to string and format dates + job["_id"] = str(job["_id"]) + job["created_at"] = job["created_at"].isoformat() + if "started_at" in job: + job["started_at"] = job["started_at"].isoformat() + if "completed_at" in job: + job["completed_at"] = job["completed_at"].isoformat() + if "failed_at" in job: + job["failed_at"] = job["failed_at"].isoformat() + jobs.append(job) + + return { + "jobs": jobs, + "pagination": { + "total": total, + "limit": limit, + "offset": offset, + "has_more": (offset + limit) < total + } + } + + async def _get_next_job_id(self) -> int: + """Get the next job ID using auto-increment.""" + # Use MongoDB's findOneAndUpdate to atomically increment counter + counter_doc = await self.db["job_counters"].find_one_and_update( + {"_id": "job_id"}, + {"$inc": {"sequence_value": 1}}, + upsert=True, + return_document=True + ) + return counter_doc["sequence_value"] + + async def flush_inactive_jobs(self, older_than_hours: int = 24, statuses: list = None) -> Dict[str, int]: + """Flush inactive jobs from the database. + + Args: + older_than_hours: Remove jobs older than this many hours (default: 24) + statuses: List of statuses to remove. If None, removes completed and failed jobs + Options: ['completed', 'failed', 'cancelled'] + + Returns: + Dictionary with count of removed jobs by status + """ + if statuses is None: + statuses = [JobStatus.COMPLETED, JobStatus.FAILED] + + # Calculate cutoff time + cutoff_time = datetime.now(timezone.utc) - timedelta(hours=older_than_hours) + + # Build query to find jobs to remove + query = { + "status": {"$in": statuses}, + "created_at": {"$lt": cutoff_time} + } + + # Get count by status before deletion + pipeline = [ + {"$match": query}, + {"$group": {"_id": "$status", "count": {"$sum": 1}}} + ] + + removal_stats = {status: 0 for status in statuses} + async for stat in self.jobs_collection.aggregate(pipeline): + removal_stats[stat["_id"]] = stat["count"] + + # Remove the jobs + result = await self.jobs_collection.delete_many(query) + + total_removed = result.deleted_count + + logger.info( + f"๐Ÿงน Flushed {total_removed} inactive jobs older than {older_than_hours} hours. " + f"Stats: {removal_stats}" + ) + + return { + "total_removed": total_removed, + "older_than_hours": older_than_hours, + "removed_by_status": removal_stats, + "cutoff_time": cutoff_time.isoformat() + } + + async def flush_all_jobs(self, confirm: bool = False) -> Dict[str, int]: + """Flush ALL jobs from the database. USE WITH CAUTION! + + Args: + confirm: Must be True to actually delete jobs (safety check) + + Returns: + Dictionary with count of removed jobs by status + """ + if not confirm: + raise ValueError("Must set confirm=True to flush all jobs. This is a destructive operation.") + + # Get stats before deletion + stats = await self.get_job_stats() + + # Remove all jobs + result = await self.jobs_collection.delete_many({}) + + # Reset job counter + await self.db["job_counters"].delete_one({"_id": "job_id"}) + + total_removed = result.deleted_count + + logger.warning( + f"๐Ÿšจ FLUSHED ALL {total_removed} jobs from database and reset job counter! " + f"Previous stats: {stats}" + ) + + return { + "total_removed": total_removed, + "previous_stats": stats, + "job_counter_reset": True + } + +# Global instance +_simple_queue = None + +async def get_simple_queue() -> SimpleQueue: + """Get the global simple queue instance.""" + global _simple_queue + if _simple_queue is None: + _simple_queue = SimpleQueue() + await _simple_queue.start_worker() + return _simple_queue + +# Convenience functions for direct access +async def flush_inactive_jobs(older_than_hours: int = 24, statuses: list = None) -> Dict[str, int]: + """Convenience function to flush inactive jobs.""" + queue = await get_simple_queue() + return await queue.flush_inactive_jobs(older_than_hours=older_than_hours, statuses=statuses) + +async def flush_all_jobs(confirm: bool = False) -> Dict[str, int]: + """Convenience function to flush all jobs. USE WITH CAUTION!""" + queue = await get_simple_queue() + return await queue.flush_all_jobs(confirm=confirm) \ No newline at end of file diff --git a/backends/advanced/webui/src/App.tsx b/backends/advanced/webui/src/App.tsx index 16b723a8..39605087 100644 --- a/backends/advanced/webui/src/App.tsx +++ b/backends/advanced/webui/src/App.tsx @@ -9,6 +9,7 @@ import Memories from './pages/Memories' import Users from './pages/Users' import System from './pages/System' import Upload from './pages/Upload' +import Queue from './pages/Queue' import LiveRecord from './pages/LiveRecord' import ProtectedRoute from './components/auth/ProtectedRoute' import { ErrorBoundary, PageErrorBoundary } from './components/ErrorBoundary' @@ -68,6 +69,11 @@ function App() { } /> + + + + } /> diff --git a/backends/advanced/webui/src/components/ConversationVersionDropdown.tsx b/backends/advanced/webui/src/components/ConversationVersionDropdown.tsx new file mode 100644 index 00000000..8d06e1a8 --- /dev/null +++ b/backends/advanced/webui/src/components/ConversationVersionDropdown.tsx @@ -0,0 +1,246 @@ +import { useState, useEffect } from 'react' +import { ChevronDown, CheckCircle, Loader2 } from 'lucide-react' +import { conversationsApi } from '../services/api' + +interface TranscriptVersion { + version_id: string + transcript: string + segments: any[] + provider: string + model?: string + created_at: string + processing_time_seconds?: number + metadata?: any +} + +interface MemoryVersion { + version_id: string + memory_count: number + transcript_version_id: string + provider: string + model?: string + created_at: string + processing_time_seconds?: number + metadata?: any +} + +interface VersionHistory { + transcript_versions: TranscriptVersion[] + memory_versions: MemoryVersion[] + active_transcript_version: string + active_memory_version: string +} + +interface ConversationVersionDropdownProps { + conversationId: string + versionInfo?: { + transcript_count: number + memory_count: number + active_transcript_version?: string + active_memory_version?: string + } + onVersionChange: () => void +} + +export default function ConversationVersionDropdown({ + conversationId, + versionInfo, + onVersionChange +}: ConversationVersionDropdownProps) { + const [versionHistory, setVersionHistory] = useState(null) + const [loading, setLoading] = useState(false) + const [activating, setActivating] = useState<{ type: 'transcript' | 'memory', versionId: string } | null>(null) + const [showTranscriptDropdown, setShowTranscriptDropdown] = useState(false) + const [showMemoryDropdown, setShowMemoryDropdown] = useState(false) + + // Close dropdowns when clicking outside + useEffect(() => { + const handleClickOutside = () => { + setShowTranscriptDropdown(false) + setShowMemoryDropdown(false) + } + document.addEventListener('click', handleClickOutside) + return () => document.removeEventListener('click', handleClickOutside) + }, []) + + const loadVersionHistory = async () => { + try { + setLoading(true) + const response = await conversationsApi.getVersionHistory(conversationId) + setVersionHistory(response.data) + } catch (err: any) { + console.error('Failed to load version history:', err) + } finally { + setLoading(false) + } + } + + useEffect(() => { + if (conversationId && ((versionInfo?.transcript_count || 0) > 1 || (versionInfo?.memory_count || 0) > 1)) { + loadVersionHistory() + } + }, [conversationId, versionInfo]) + + const handleActivateVersion = async (type: 'transcript' | 'memory', versionId: string) => { + try { + setActivating({ type, versionId }) + + if (type === 'transcript') { + await conversationsApi.activateTranscriptVersion(conversationId, versionId) + setShowTranscriptDropdown(false) + } else { + await conversationsApi.activateMemoryVersion(conversationId, versionId) + setShowMemoryDropdown(false) + } + + // Reload version history to update active version + await loadVersionHistory() + + // Notify parent component to refresh conversation data + onVersionChange() + + } catch (err: any) { + console.error(`Failed to activate ${type} version:`, err) + } finally { + setActivating(null) + } + } + + const formatVersionLabel = (version: TranscriptVersion | MemoryVersion, index: number) => { + return `v${index + 1} (${version.provider}${version.model ? ` ${version.model}` : ''})` + } + + const formatDate = (dateString: string) => { + return new Date(dateString).toLocaleDateString() + } + + // Don't show anything if there are no multiple versions + if (!versionInfo || ((versionInfo.transcript_count || 0) <= 1 && (versionInfo.memory_count || 0) <= 1)) { + return null + } + + return ( +
+ {/* Transcript Version Dropdown */} + {(versionInfo.transcript_count || 0) > 1 && ( +
+ + + {showTranscriptDropdown && versionHistory && ( +
e.stopPropagation()} + > +
+ {versionHistory.transcript_versions.map((version, index) => ( + + ))} +
+
+ )} +
+ )} + + {/* Memory Version Dropdown */} + {(versionInfo.memory_count || 0) > 1 && ( +
+ + + {showMemoryDropdown && versionHistory && ( +
e.stopPropagation()} + > +
+ {versionHistory.memory_versions.map((version, index) => ( + + ))} +
+
+ )} +
+ )} + + {loading && ( +
+ + Loading versions... +
+ )} +
+ ) +} \ No newline at end of file diff --git a/backends/advanced/webui/src/components/ConversationVersionHeader.tsx b/backends/advanced/webui/src/components/ConversationVersionHeader.tsx new file mode 100644 index 00000000..9e7c5e09 --- /dev/null +++ b/backends/advanced/webui/src/components/ConversationVersionHeader.tsx @@ -0,0 +1,111 @@ +import { useState } from 'react'; +import { RotateCcw } from 'lucide-react'; +import { conversationsApi } from '../services/api'; +import ConversationVersionDropdown from './ConversationVersionDropdown'; + +interface ConversationVersionHeaderProps { + conversationId: string; + versionInfo?: { + transcript_count: number; + memory_count: number; + active_transcript_version?: string; + active_memory_version?: string; + }; + onVersionChange?: () => void; +} + +export default function ConversationVersionHeader({ conversationId, versionInfo, onVersionChange }: ConversationVersionHeaderProps) { + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const handleReprocessTranscript = async (event: React.MouseEvent) => { + event.preventDefault(); + event.stopPropagation(); + + try { + setLoading(true); + await conversationsApi.reprocessTranscript(conversationId); + onVersionChange?.(); + } catch (err) { + console.error('Failed to reprocess transcript:', err); + setError('Failed to reprocess transcript'); + } finally { + setLoading(false); + } + }; + + // If no version info provided, don't show anything + if (!versionInfo) return null; + + // Only show if there are multiple versions or reprocessing capability + if (versionInfo.transcript_count <= 1 && versionInfo.memory_count <= 1) { + return ( +
+
+
+ {versionInfo.transcript_count} transcript version, {versionInfo.memory_count} memory version +
+ +
+
+ ); + } + + // Show multiple version info with reprocess option and version selector + return ( +
+
+
+
+ {versionInfo.transcript_count} transcript versions, + {versionInfo.memory_count} memory versions + {error &&
{error}
} +
+ + {/* Version Selector Dropdowns */} + {})} + /> +
+ + +
+
+ ); +} \ No newline at end of file diff --git a/backends/advanced/webui/src/components/layout/Layout.tsx b/backends/advanced/webui/src/components/layout/Layout.tsx index 13f2fa13..b7f21ba9 100644 --- a/backends/advanced/webui/src/components/layout/Layout.tsx +++ b/backends/advanced/webui/src/components/layout/Layout.tsx @@ -1,5 +1,5 @@ import { Link, useLocation, Outlet } from 'react-router-dom' -import { Music, MessageSquare, MessageCircle, Brain, Users, Upload, Settings, LogOut, Sun, Moon, Shield, Radio } from 'lucide-react' +import { Music, MessageSquare, MessageCircle, Brain, Users, Upload, Settings, LogOut, Sun, Moon, Shield, Radio, Layers } from 'lucide-react' import { useAuth } from '../../contexts/AuthContext' import { useTheme } from '../../contexts/ThemeContext' @@ -16,6 +16,7 @@ export default function Layout() { { path: '/users', label: 'User Management', icon: Users }, ...(isAdmin ? [ { path: '/upload', label: 'Upload Audio', icon: Upload }, + { path: '/queue', label: 'Queue Management', icon: Layers }, { path: '/system', label: 'System State', icon: Settings }, ] : []), ] diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 79698be3..f8fc6088 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -1,6 +1,7 @@ import { useState, useEffect, useRef } from 'react' import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2 } from 'lucide-react' import { conversationsApi, BACKEND_URL } from '../services/api' +import ConversationVersionHeader from '../components/ConversationVersionHeader' interface Conversation { conversation_id?: string @@ -28,6 +29,12 @@ interface Conversation { memory_processing_status?: string transcription_status?: string action_items?: any[] + version_info?: { + transcript_count: number + memory_count: number + active_transcript_version?: string + active_memory_version?: string + } } // Speaker color palette for consistent colors across conversations @@ -345,6 +352,18 @@ export default function Conversations() { key={conversation.audio_uuid} className="bg-gray-50 dark:bg-gray-700 rounded-lg p-6 border border-gray-200 dark:border-gray-600" > + {/* Version Selector Header - Only show for conversations with conversation_id */} + {conversation.conversation_id && ( + { + // Refresh the conversation data when version changes + loadConversations(); + }} + /> + )} + {/* Conversation Header */}
@@ -506,7 +525,7 @@ export default function Conversations() { {/* Transcript Content - Conditionally Rendered */} {expandedTranscripts.has(conversation.audio_uuid) && ( -
+
{conversation.transcript && conversation.transcript.length > 0 ? (
@@ -584,6 +603,7 @@ export default function Conversations() { No transcript available
)} +
)}
diff --git a/backends/advanced/webui/src/pages/Queue.tsx b/backends/advanced/webui/src/pages/Queue.tsx new file mode 100644 index 00000000..c5e05aa8 --- /dev/null +++ b/backends/advanced/webui/src/pages/Queue.tsx @@ -0,0 +1,884 @@ +import React, { useState, useEffect } from 'react'; +import { + Clock, + Play, + CheckCircle, + XCircle, + RotateCcw, + StopCircle, + Eye, + Filter, + X, + RefreshCw, + Layers, + Trash2, + AlertTriangle +} from 'lucide-react'; + +interface QueueJob { + job_id: string; + job_type: string; + user_id: string; + status: 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled' | 'retrying'; + priority: 'low' | 'normal' | 'high'; + data: any; + result?: any; + error_message?: string; + created_at: string; + started_at?: string; + completed_at?: string; + retry_count: number; + max_retries: number; + progress_percent: number; + progress_message: string; +} + +interface QueueStats { + total_jobs: number; + queued_jobs: number; + processing_jobs: number; + completed_jobs: number; + failed_jobs: number; + cancelled_jobs: number; + retrying_jobs: number; + timestamp: string; +} + +interface Filters { + status: string; + job_type: string; + priority: string; +} + +const Queue: React.FC = () => { + const [jobs, setJobs] = useState([]); + const [stats, setStats] = useState(null); + const [loading, setLoading] = useState(true); + const [selectedJob, setSelectedJob] = useState(null); + const [filters, setFilters] = useState({ + status: '', + job_type: '', + priority: '' + }); + const [pagination, setPagination] = useState({ + offset: 0, + limit: 20, + total: 0, + has_more: false + }); + const [refreshing, setRefreshing] = useState(false); + const [showFlushModal, setShowFlushModal] = useState(false); + const [flushSettings, setFlushSettings] = useState({ + older_than_hours: 24, + statuses: ['completed', 'failed'], + flush_all: false + }); + const [flushing, setFlushing] = useState(false); + + // Auto-refresh interval + useEffect(() => { + console.log('๐Ÿ”„ Setting up queue auto-refresh interval'); + const interval = setInterval(() => { + if (!loading) { + console.log('โฐ Auto-refreshing queue data'); + fetchData(); + } + }, 5000); // Refresh every 5 seconds + + return () => { + console.log('๐Ÿงน Clearing queue auto-refresh interval'); + clearInterval(interval); + }; + }, []); // Remove dependencies to prevent interval recreation + + // Initial data fetch + useEffect(() => { + fetchData(); + }, [filters, pagination.offset]); + + const fetchData = async () => { + console.log('๐Ÿ“ฅ fetchData called, refreshing:', refreshing, 'loading:', loading); + if (!refreshing) setRefreshing(true); + + try { + console.log('๐Ÿ”„ Starting Promise.all for jobs and stats'); + await Promise.all([fetchJobs(), fetchStats()]); + console.log('โœ… Promise.all completed successfully'); + } catch (error) { + console.error('โŒ Error fetching queue data:', error); + } finally { + setLoading(false); + setRefreshing(false); + console.log('๐Ÿ fetchData completed'); + } + }; + + const fetchJobs = async () => { + try { + console.log('๐Ÿ” fetchJobs starting...'); + const params = new URLSearchParams({ + limit: pagination.limit.toString(), + offset: pagination.offset.toString(), + sort: 'created_at', + order: 'desc' + }); + + if (filters.status) params.append('status', filters.status); + if (filters.job_type) params.append('job_type', filters.job_type); + if (filters.priority) params.append('priority', filters.priority); + + console.log('๐Ÿ“ก Fetching jobs with params:', params.toString()); + const response = await fetch(`/api/queue/jobs?${params}`, { + headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` } + }); + + if (response.ok) { + const data = await response.json(); + console.log('โœ… fetchJobs success, got', data.jobs?.length, 'jobs'); + setJobs(data.jobs); + setPagination(prev => ({ + ...prev, + total: data.pagination.total, + has_more: data.pagination.has_more + })); + } else if (response.status === 401) { + console.warn('๐Ÿ” Auth error in fetchJobs, redirecting to login'); + localStorage.removeItem('token'); + window.location.href = '/login'; + } else { + console.error('โŒ Error fetching jobs:', response.status, response.statusText); + } + } catch (error) { + console.error('โŒ Error fetching jobs:', error); + } + }; + + const fetchStats = async () => { + try { + console.log('๐Ÿ“Š fetchStats starting...'); + const response = await fetch('/api/queue/stats', { + headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` } + }); + + if (response.ok) { + const data = await response.json(); + console.log('โœ… fetchStats success, total jobs:', data.total_jobs); + setStats(data); + } else if (response.status === 401) { + console.warn('๐Ÿ” Auth error in fetchStats, redirecting to login'); + localStorage.removeItem('token'); + window.location.href = '/login'; + } else { + console.error('โŒ Error fetching stats:', response.status, response.statusText); + } + } catch (error) { + console.error('โŒ Error fetching stats:', error); + } + }; + + const retryJob = async (jobId: string) => { + try { + const response = await fetch(`/api/queue/jobs/${jobId}/retry`, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ force: false }) + }); + + if (response.ok) { + fetchJobs(); + } else if (response.status === 401) { + localStorage.removeItem('token'); + window.location.href = '/login'; + } + } catch (error) { + console.error('Error retrying job:', error); + } + }; + + const cancelJob = async (jobId: string) => { + if (!confirm('Are you sure you want to cancel this job?')) return; + + try { + const response = await fetch(`/api/queue/jobs/${jobId}`, { + method: 'DELETE', + headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` } + }); + + if (response.ok) { + fetchJobs(); + } else if (response.status === 401) { + localStorage.removeItem('token'); + window.location.href = '/login'; + } + } catch (error) { + console.error('Error cancelling job:', error); + } + }; + + const applyFilters = () => { + setPagination(prev => ({ ...prev, offset: 0 })); + fetchJobs(); + }; + + const clearFilters = () => { + setFilters({ status: '', job_type: '', priority: '' }); + setPagination(prev => ({ ...prev, offset: 0 })); + }; + + const nextPage = () => { + if (pagination.has_more) { + setPagination(prev => ({ ...prev, offset: prev.offset + prev.limit })); + } + }; + + const prevPage = () => { + if (pagination.offset > 0) { + setPagination(prev => ({ + ...prev, + offset: Math.max(0, prev.offset - prev.limit) + })); + } + }; + + const getStatusIcon = (status: string) => { + switch (status) { + case 'queued': return ; + case 'processing': return ; + case 'completed': return ; + case 'failed': return ; + case 'cancelled': return ; + case 'retrying': return ; + default: return ; + } + }; + + const getStatusColor = (status: string) => { + switch (status) { + case 'queued': return 'text-yellow-600 bg-yellow-100'; + case 'processing': return 'text-blue-600 bg-blue-100'; + case 'completed': return 'text-green-600 bg-green-100'; + case 'failed': return 'text-red-600 bg-red-100'; + case 'cancelled': return 'text-gray-600 bg-gray-100'; + case 'retrying': return 'text-orange-600 bg-orange-100'; + default: return 'text-gray-600 bg-gray-100'; + } + }; + + const formatJobType = (type: string) => { + const typeMap: { [key: string]: string } = { + 'process_audio_files': 'Audio File Processing', + 'process_single_audio_file': 'Single Audio File', + 'reprocess_transcript': 'Reprocess Transcript', + 'reprocess_memory': 'Reprocess Memory' + }; + return typeMap[type] || type; + }; + + const getJobTypeShort = (type: string) => { + const typeMap: { [key: string]: string } = { + 'process_audio_files': 'Process', + 'process_single_audio_file': 'Process', + 'reprocess_transcript': 'Reprocess', + 'reprocess_memory': 'Memory' + }; + return typeMap[type] || type; + }; + + const getJobResult = (job: QueueJob) => { + if (job.status !== 'completed' || !job.result) { + return -; + } + + const result = job.result; + + // Show different results based on job type + if (job.job_type === 'reprocess_transcript') { + const segments = result.transcript_segments || 0; + const speakers = result.speakers_identified || 0; + + return ( +
+
{segments} segments
+ {speakers > 0 && ( +
{speakers} speakers identified
+ )} +
+ ); + } + + if (job.job_type === 'reprocess_memory') { + const memories = result.memory_count || 0; + return ( +
+ {memories} memories +
+ ); + } + + return ( +
+ โœ“ Success +
+ ); + }; + + const flushJobs = async () => { + setFlushing(true); + try { + const endpoint = flushSettings.flush_all ? '/api/queue/flush-all' : '/api/queue/flush'; + const body = flushSettings.flush_all + ? { confirm: true } + : { + older_than_hours: flushSettings.older_than_hours, + statuses: flushSettings.statuses + }; + + const response = await fetch(endpoint, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(body) + }); + + if (response.ok) { + const result = await response.json(); + alert(`Successfully flushed ${result.total_removed} jobs!`); + setShowFlushModal(false); + fetchData(); // Refresh the data + } else if (response.status === 403) { + alert('Admin access required to flush jobs'); + } else if (response.status === 401) { + localStorage.removeItem('token'); + window.location.href = '/login'; + } else { + const error = await response.json(); + alert(`Error: ${error.detail || 'Failed to flush jobs'}`); + } + } catch (error) { + console.error('Error flushing jobs:', error); + alert('Failed to flush jobs'); + } finally { + setFlushing(false); + } + }; + + const formatDate = (dateString: string) => { + return new Date(dateString).toLocaleString(); + }; + + if (loading) { + return ( +
+
+
+ ); + } + + return ( +
+ {/* Header */} +
+
+ +

Queue Management

+
+
+ + +
+
+ + {/* Stats Cards */} + {stats && ( +
+
+
+ +
+

Total

+

{stats.total_jobs}

+
+
+
+ +
+
+ +
+

Queued

+

{stats.queued_jobs}

+
+
+
+ +
+
+ 0 ? 'animate-pulse' : ''}`} /> +
+

Processing

+

{stats.processing_jobs}

+
+
+
+ +
+
+ +
+

Completed

+

{stats.completed_jobs}

+
+
+
+ +
+
+ +
+

Failed

+

{stats.failed_jobs}

+
+
+
+ +
+
+ +
+

Cancelled

+

{stats.cancelled_jobs}

+
+
+
+ +
+
+ +
+

Retrying

+

{stats.retrying_jobs}

+
+
+
+
+ )} + + {/* Filters */} +
+

Filters

+
+
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+
+
+ + {/* Jobs Table */} +
+
+

Jobs

+
+ +
+ + + + + + + + + + + + + {jobs.map((job) => ( + + + + + + + + + ))} + +
DateIDTypeStatusResultActions
+ {formatDate(job.created_at)} + +
+ #{job.job_id} +
+
+
{getJobTypeShort(job.job_type)}
+
+ + {getStatusIcon(job.status)} + {job.status.charAt(0).toUpperCase() + job.status.slice(1)} + + + {getJobResult(job)} + + {job.status === 'failed' && ( + + )} + + {(job.status === 'queued' || job.status === 'processing') && ( + + )} +
+
+ + {/* Pagination */} + {pagination.total > pagination.limit && ( +
+
+ Showing {pagination.offset + 1} to {Math.min(pagination.offset + pagination.limit, pagination.total)} of {pagination.total} results +
+
+ + +
+
+ )} +
+ + {/* Job Details Modal */} + {selectedJob && ( +
+
+
+

Job Details

+ +
+ +
+
+
+ +

{selectedJob.job_id}

+
+
+ + + {getStatusIcon(selectedJob.status)} + {selectedJob.status.charAt(0).toUpperCase() + selectedJob.status.slice(1)} + +
+
+ +

{formatJobType(selectedJob.job_type)}

+
+
+ +

{selectedJob.priority}

+
+
+ +

{formatDate(selectedJob.created_at)}

+
+ {selectedJob.completed_at && ( +
+ +

{formatDate(selectedJob.completed_at)}

+
+ )} +
+ + {selectedJob.progress_message && ( +
+ +

{selectedJob.progress_message}

+ {selectedJob.progress_percent !== undefined && ( +
+
+
+ )} +
+ )} + + {selectedJob.error_message && ( +
+ +

{selectedJob.error_message}

+
+ )} + + {selectedJob.data && ( +
+ +
+                    {JSON.stringify(selectedJob.data, null, 2)}
+                  
+
+ )} + + {selectedJob.result && ( +
+ +
+                    {JSON.stringify(selectedJob.result, null, 2)}
+                  
+
+ )} +
+
+
+ )} + + {/* Flush Jobs Modal */} + {showFlushModal && ( +
+
+
+

+ + Flush Jobs +

+ +
+ +
+
+
+ + This will permanently remove jobs from the database +
+
+ +
+
+ + + {!flushSettings.flush_all && ( +
+
+ + +
+ +
+ +
+ {['completed', 'failed', 'cancelled'].map(status => ( + + ))} +
+
+
+ )} +
+ +
+ + + {flushSettings.flush_all && ( +
+
+

+ โš ๏ธ This will remove ALL jobs including queued and processing ones, and reset the job counter! +

+
+
+ )} +
+
+ +
+ + +
+
+
+
+ )} +
+ ); +}; + +export default Queue; \ No newline at end of file diff --git a/backends/advanced/webui/src/services/api.ts b/backends/advanced/webui/src/services/api.ts index 5c9d82f0..afb174eb 100644 --- a/backends/advanced/webui/src/services/api.ts +++ b/backends/advanced/webui/src/services/api.ts @@ -133,6 +133,14 @@ export const systemApi = { reloadMemoryConfig: () => api.post('/api/admin/memory/config/reload'), } +export const queueApi = { + getJobs: (params: URLSearchParams) => api.get(`/api/queue/jobs?${params}`), + getStats: () => api.get('/api/queue/stats'), + retryJob: (jobId: string, force: boolean = false) => + api.post(`/api/queue/jobs/${jobId}/retry`, { force }), + cancelJob: (jobId: string) => api.delete(`/api/queue/jobs/${jobId}`), +} + export const uploadApi = { uploadAudioFiles: (files: FormData, onProgress?: (progress: number) => void) => api.post('/api/process-audio-files', files, {