Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions backends/advanced/src/advanced_omi_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
"speech_inactivity_threshold": 60, # Speech gap threshold for closure (1 minute)
}

# Default audio storage settings
DEFAULT_AUDIO_STORAGE_SETTINGS = {
"audio_base_path": "/app/data", # Main audio directory (where volume is mounted)
"audio_chunks_path": "/app/data/audio_chunks", # Full path to audio chunks subfolder
}

# Global cache for diarization settings
_diarization_settings = None

Expand Down Expand Up @@ -140,5 +146,18 @@ def get_conversation_stop_settings():
}


def get_audio_storage_settings():
"""Get audio storage settings from environment or defaults."""

# Get base path and derive chunks path
audio_base_path = os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"])
audio_chunks_path = os.getenv("AUDIO_CHUNKS_PATH", f"{audio_base_path}/audio_chunks")

return {
"audio_base_path": audio_base_path,
"audio_chunks_path": audio_chunks_path,
}


# Initialize settings on module load
_diarization_settings = load_diarization_settings_from_file()

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions backends/advanced/src/advanced_omi_backend/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,50 @@ async def activate_transcript_version(self, conversation_id: str, version_id: st
logger.info(f"Activated transcript version {version_id} for conversation {conversation_id}")
return result.modified_count > 0

async def update_transcript_version(
self,
conversation_id: str,
version_id: str,
transcript: str = None,
segments: list = None,
processing_time_seconds: float = None,
provider: str = None,
model: str = None
) -> bool:
"""Update a specific transcript version with processing results."""
update_fields = {}

if transcript is not None:
update_fields["transcript_versions.$.transcript"] = transcript
if segments is not None:
update_fields["transcript_versions.$.segments"] = segments
if processing_time_seconds is not None:
update_fields["transcript_versions.$.processing_time_seconds"] = processing_time_seconds
if provider is not None:
update_fields["transcript_versions.$.provider"] = provider
if model is not None:
update_fields["transcript_versions.$.model"] = model

# Always update the completion timestamp
update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat()
update_fields["transcript_versions.$.status"] = "completed"

if not update_fields:
return False

result = await self.col.update_one(
{
"conversation_id": conversation_id,
"transcript_versions.version_id": version_id
},
{"$set": update_fields}
)

if result.modified_count > 0:
logger.info(f"Updated transcript version {version_id} for conversation {conversation_id}")
return True
return False

Comment on lines +709 to +752
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Type hints should be Optional[...] and status casing should be consistent.

  • PEP 484: parameters with = None must be Optional[T].
  • Elsewhere statuses are uppercase; here you set "completed".
-    async def update_transcript_version(
-        self, 
-        conversation_id: str, 
-        version_id: str, 
-        transcript: str = None, 
-        segments: list = None, 
-        processing_time_seconds: float = None,
-        provider: str = None,
-        model: str = None
-    ) -> bool:
+    async def update_transcript_version(
+        self,
+        conversation_id: str,
+        version_id: str,
+        transcript: Optional[str] = None,
+        segments: Optional[list] = None,
+        processing_time_seconds: Optional[float] = None,
+        provider: Optional[str] = None,
+        model: Optional[str] = None,
+    ) -> bool:
@@
-        update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat()
-        update_fields["transcript_versions.$.status"] = "completed"
+        update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat()
+        update_fields["transcript_versions.$.status"] = "COMPLETED"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def update_transcript_version(
self,
conversation_id: str,
version_id: str,
transcript: str = None,
segments: list = None,
processing_time_seconds: float = None,
provider: str = None,
model: str = None
) -> bool:
"""Update a specific transcript version with processing results."""
update_fields = {}
if transcript is not None:
update_fields["transcript_versions.$.transcript"] = transcript
if segments is not None:
update_fields["transcript_versions.$.segments"] = segments
if processing_time_seconds is not None:
update_fields["transcript_versions.$.processing_time_seconds"] = processing_time_seconds
if provider is not None:
update_fields["transcript_versions.$.provider"] = provider
if model is not None:
update_fields["transcript_versions.$.model"] = model
# Always update the completion timestamp
update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat()
update_fields["transcript_versions.$.status"] = "completed"
if not update_fields:
return False
result = await self.col.update_one(
{
"conversation_id": conversation_id,
"transcript_versions.version_id": version_id
},
{"$set": update_fields}
)
if result.modified_count > 0:
logger.info(f"Updated transcript version {version_id} for conversation {conversation_id}")
return True
return False
async def update_transcript_version(
self,
conversation_id: str,
version_id: str,
transcript: Optional[str] = None,
segments: Optional[list] = None,
processing_time_seconds: Optional[float] = None,
provider: Optional[str] = None,
model: Optional[str] = None,
) -> bool:
"""Update a specific transcript version with processing results."""
update_fields = {}
if transcript is not None:
update_fields["transcript_versions.$.transcript"] = transcript
if segments is not None:
update_fields["transcript_versions.$.segments"] = segments
if processing_time_seconds is not None:
update_fields["transcript_versions.$.processing_time_seconds"] = processing_time_seconds
if provider is not None:
update_fields["transcript_versions.$.provider"] = provider
if model is not None:
update_fields["transcript_versions.$.model"] = model
# Always update the completion timestamp
update_fields["transcript_versions.$.completed_at"] = datetime.now(UTC).isoformat()
update_fields["transcript_versions.$.status"] = "COMPLETED"
if not update_fields:
return False
result = await self.col.update_one(
{
"conversation_id": conversation_id,
"transcript_versions.version_id": version_id
},
{"$set": update_fields}
)
if result.modified_count > 0:
logger.info(f"Updated transcript version {version_id} for conversation {conversation_id}")
return True
return False
🧰 Tools
🪛 Ruff (0.13.1)

745-745: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


746-746: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


747-747: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


748-748: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


749-749: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)

async def activate_memory_version(self, conversation_id: str, version_id: str) -> bool:
"""Activate a specific memory version in conversation."""
# First verify the version exists
Expand Down
30 changes: 30 additions & 0 deletions backends/advanced/src/advanced_omi_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
get_processor_manager,
init_processor_manager,
)

from advanced_omi_backend.simple_queue import get_simple_queue

from advanced_omi_backend.audio_utils import process_audio_chunk
from advanced_omi_backend.task_manager import init_task_manager, get_task_manager
from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator
Expand Down Expand Up @@ -320,6 +323,25 @@ async def lifespan(app: FastAPI):
processor_manager = init_processor_manager(CHUNK_DIR, ac_repository)
await processor_manager.start()

application_logger.info("Application-level processors started")

# Initialize simple queue system
try:
queue = await get_simple_queue()
application_logger.info("Simple queue system started")
except Exception as e:
application_logger.error(f"Failed to start simple queue: {e}")
# Don't raise as queue system is not critical for basic operation

# Skip memory service pre-initialization to avoid blocking FastAPI startup
Comment on lines +328 to +336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential UnboundLocalError for queue on shutdown

If get_simple_queue() throws, queue is undefined but referenced in finally. Store it on app.state and initialize to None.

Apply this diff:

-    # Initialize simple queue system
-    try:
-        queue = await get_simple_queue()
-        application_logger.info("Simple queue system started")
-    except Exception as e:
-        application_logger.error(f"Failed to start simple queue: {e}")
+    # Initialize simple queue system
+    app.state.simple_queue = None
+    try:
+        app.state.simple_queue = await get_simple_queue()
+        application_logger.info("Simple queue system started")
+    except Exception:
+        application_logger.exception("Failed to start simple queue")
         # Don't raise as queue system is not critical for basic operation
...
-        # Shutdown simple queue system
-        try:
-            if queue:
-                await queue.stop_worker()
-                application_logger.info("Simple queue system shut down")
-        except Exception as e:
-            application_logger.error(f"Error shutting down simple queue: {e}")
+        # Shutdown simple queue system
+        try:
+            q = getattr(app.state, "simple_queue", None)
+            if q:
+                await q.stop_worker()
+                application_logger.info("Simple queue system shut down")
+        except Exception:
+            application_logger.exception("Error shutting down simple queue")

Also applies to: 356-363

🧰 Tools
🪛 Ruff (0.13.1)

332-332: Do not catch blind exception: Exception

(BLE001)


333-333: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/main.py around lines 328-336 and
also 356-363, if get_simple_queue() raises an exception the local variable
`queue` can remain undefined and later referenced in the shutdown/finally block
causing UnboundLocalError; fix by setting a default (e.g., app.state.queue =
None) before the try, assign the created queue to app.state.queue inside the
try, update any shutdown/finally references to use app.state.queue (checking for
None before closing/awaiting), and ensure app.state.queue is initialized on app
creation so it always exists even when queue startup fails.

# Memory service will be lazily initialized when first used
application_logger.info("Memory service will be initialized on first use (lazy loading)")

# SystemTracker is used for monitoring and debugging
application_logger.info("Using SystemTracker for monitoring and debugging")

application_logger.info("Application ready - using application-level processing architecture.")

logger.info("App ready")
try:
yield
Expand All @@ -331,6 +353,14 @@ async def lifespan(app: FastAPI):
for client_id in client_manager.get_all_client_ids():
await cleanup_client_state(client_id)

# Shutdown simple queue system
try:
if queue:
await queue.stop_worker()
application_logger.info("Simple queue system shut down")
except Exception as e:
application_logger.error(f"Error shutting down simple queue: {e}")

# Shutdown processor manager
processor_manager = get_processor_manager()
await processor_manager.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
client_router,
conversation_router,
memory_router,
queue_router,
system_router,
user_router,
)
Expand All @@ -31,6 +32,7 @@
router.include_router(conversation_router)
router.include_router(memory_router)
router.include_router(system_router)
router.include_router(queue_router)


logger.info("API router initialized with all sub-modules")
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- conversation_routes: Conversation CRUD and audio processing
- memory_routes: Memory management, search, and debug
- system_routes: System utilities, metrics, and file processing
- queue_routes: Job queue management and monitoring
"""

from .chat_routes import router as chat_router
Expand All @@ -16,5 +17,6 @@
from .memory_routes import router as memory_router
from .system_routes import router as system_router
from .user_routes import router as user_router
from .queue_routes import router as queue_router

__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router"]
__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router", "queue_router"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Simple queue API routes for job monitoring.
Provides basic endpoints for viewing job status and statistics.
"""

import logging
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from typing import List, Optional

from advanced_omi_backend.auth import current_active_user
from advanced_omi_backend.simple_queue import get_simple_queue
from advanced_omi_backend.users import User

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/queue", tags=["queue"])


@router.get("/jobs")
async def list_jobs(
limit: int = Query(20, ge=1, le=100, description="Number of jobs to return"),
offset: int = Query(0, ge=0, description="Number of jobs to skip"),
status: str = Query(None, description="Filter by job status"),
job_type: str = Query(None, description="Filter by job type"),
priority: str = Query(None, description="Filter by job priority"),
current_user: User = Depends(current_active_user)
):
"""List jobs with pagination and filtering."""
try:
# Build filters dict
filters = {}
if status:
filters["status"] = status
if job_type:
filters["job_type"] = job_type
if priority:
filters["priority"] = priority

queue = await get_simple_queue()
result = await queue.get_jobs(limit=limit, offset=offset, filters=filters)

Comment on lines +31 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Apply user scoping at the data source, not post-filtering.

Post-filtering breaks pagination totals/has_more. Pass user_id to the queue layer for non-admins.

-        queue = await get_simple_queue()
-        result = await queue.get_jobs(limit=limit, offset=offset, filters=filters)
-        
-            # Filter jobs by user if not admin
-        if not current_user.is_superuser:
-            result["jobs"] = [
-                job for job in result["jobs"]
-                if job["user_id"] == str(current_user.user_id)
-            ]
-            result["pagination"]["total"] = len(result["jobs"])
+        queue = await get_simple_queue()
+        if not current_user.is_superuser:
+            filters["user_id"] = str(current_user.user_id)
+        result = await queue.get_jobs(limit=limit, offset=offset, filters=filters)

Also applies to: 43-49

# Filter jobs by user if not admin
if not current_user.is_superuser:
result["jobs"] = [
job for job in result["jobs"]
if job["user_id"] == str(current_user.user_id)
]
result["pagination"]["total"] = len(result["jobs"])

return result

except Exception as e:
logger.error(f"Failed to list jobs: {e}")
return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}}


@router.get("/stats")
async def get_queue_stats(
current_user: User = Depends(current_active_user)
):
"""Get queue statistics."""
try:
queue = await get_simple_queue()
stats = await queue.get_job_stats()
return stats

except Exception as e:
logger.error(f"Failed to get queue stats: {e}")
return {"queued": 0, "processing": 0, "completed": 0, "failed": 0}


@router.get("/health")
async def get_queue_health():
"""Get queue system health status."""
try:
queue = await get_simple_queue()

return {
"status": "healthy" if queue.running else "stopped",
"worker_running": queue.running,
"message": "Simple queue is operational" if queue.running else "Simple queue worker not running"
}

except Exception as e:
logger.error(f"Failed to get queue health: {e}")
return {
"status": "unhealthy",
"message": f"Health check failed: {str(e)}"
}


class FlushJobsRequest(BaseModel):
older_than_hours: int = 24
statuses: Optional[List[str]] = None


class FlushAllJobsRequest(BaseModel):
confirm: bool = False


@router.post("/flush")
async def flush_inactive_jobs(
request: FlushJobsRequest,
current_user: User = Depends(current_active_user)
):
"""Flush inactive jobs from the database (admin only)."""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")

try:
queue = await get_simple_queue()
result = await queue.flush_inactive_jobs(
older_than_hours=request.older_than_hours,
statuses=request.statuses
)
return result

except Exception as e:
logger.error(f"Failed to flush inactive jobs: {e}")
raise HTTPException(status_code=500, detail=f"Failed to flush jobs: {str(e)}")


@router.post("/flush-all")
async def flush_all_jobs(
request: FlushAllJobsRequest,
current_user: User = Depends(current_active_user)
):
"""Flush ALL jobs from the database (admin only). USE WITH EXTREME CAUTION!"""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Admin access required")

try:
if not request.confirm:
raise HTTPException(
status_code=400,
detail="Must set confirm=true to flush all jobs. This is a destructive operation."
)

queue = await get_simple_queue()
result = await queue.flush_all_jobs(confirm=request.confirm)
return result

except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to flush all jobs: {e}")
raise HTTPException(status_code=500, detail=f"Failed to flush all jobs: {str(e)}")
Loading
Loading