-
Notifications
You must be signed in to change notification settings - Fork 25
Added queue management page and linked repreocess transcription #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,9 @@ | |
| get_processor_manager, | ||
| init_processor_manager, | ||
| ) | ||
|
|
||
| from advanced_omi_backend.simple_queue import get_simple_queue | ||
|
|
||
| from advanced_omi_backend.audio_utils import process_audio_chunk | ||
| from advanced_omi_backend.task_manager import init_task_manager, get_task_manager | ||
| from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator | ||
|
|
@@ -320,6 +323,25 @@ async def lifespan(app: FastAPI): | |
| processor_manager = init_processor_manager(CHUNK_DIR, ac_repository) | ||
| await processor_manager.start() | ||
|
|
||
| application_logger.info("Application-level processors started") | ||
|
|
||
| # Initialize simple queue system | ||
| try: | ||
| queue = await get_simple_queue() | ||
| application_logger.info("Simple queue system started") | ||
| except Exception as e: | ||
| application_logger.error(f"Failed to start simple queue: {e}") | ||
| # Don't raise as queue system is not critical for basic operation | ||
|
|
||
| # Skip memory service pre-initialization to avoid blocking FastAPI startup | ||
|
Comment on lines
+328
to
+336
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix potential UnboundLocalError for If get_simple_queue() throws, Apply this diff: - # Initialize simple queue system
- try:
- queue = await get_simple_queue()
- application_logger.info("Simple queue system started")
- except Exception as e:
- application_logger.error(f"Failed to start simple queue: {e}")
+ # Initialize simple queue system
+ app.state.simple_queue = None
+ try:
+ app.state.simple_queue = await get_simple_queue()
+ application_logger.info("Simple queue system started")
+ except Exception:
+ application_logger.exception("Failed to start simple queue")
# Don't raise as queue system is not critical for basic operation
...
- # Shutdown simple queue system
- try:
- if queue:
- await queue.stop_worker()
- application_logger.info("Simple queue system shut down")
- except Exception as e:
- application_logger.error(f"Error shutting down simple queue: {e}")
+ # Shutdown simple queue system
+ try:
+ q = getattr(app.state, "simple_queue", None)
+ if q:
+ await q.stop_worker()
+ application_logger.info("Simple queue system shut down")
+ except Exception:
+ application_logger.exception("Error shutting down simple queue")Also applies to: 356-363 🧰 Tools🪛 Ruff (0.13.1)332-332: Do not catch blind exception: (BLE001) 333-333: Use Replace with (TRY400) 🤖 Prompt for AI Agents |
||
| # Memory service will be lazily initialized when first used | ||
| application_logger.info("Memory service will be initialized on first use (lazy loading)") | ||
|
|
||
| # SystemTracker is used for monitoring and debugging | ||
| application_logger.info("Using SystemTracker for monitoring and debugging") | ||
|
|
||
| application_logger.info("Application ready - using application-level processing architecture.") | ||
|
|
||
| logger.info("App ready") | ||
| try: | ||
| yield | ||
|
|
@@ -331,6 +353,14 @@ async def lifespan(app: FastAPI): | |
| for client_id in client_manager.get_all_client_ids(): | ||
| await cleanup_client_state(client_id) | ||
|
|
||
| # Shutdown simple queue system | ||
| try: | ||
| if queue: | ||
| await queue.stop_worker() | ||
| application_logger.info("Simple queue system shut down") | ||
| except Exception as e: | ||
| application_logger.error(f"Error shutting down simple queue: {e}") | ||
|
|
||
| # Shutdown processor manager | ||
| processor_manager = get_processor_manager() | ||
| await processor_manager.shutdown() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| """ | ||
| Simple queue API routes for job monitoring. | ||
| Provides basic endpoints for viewing job status and statistics. | ||
| """ | ||
|
|
||
| import logging | ||
| from fastapi import APIRouter, Depends, Query, HTTPException | ||
| from pydantic import BaseModel | ||
| from typing import List, Optional | ||
|
|
||
| from advanced_omi_backend.auth import current_active_user | ||
| from advanced_omi_backend.simple_queue import get_simple_queue | ||
| from advanced_omi_backend.users import User | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| router = APIRouter(prefix="/queue", tags=["queue"]) | ||
|
|
||
|
|
||
| @router.get("/jobs") | ||
| async def list_jobs( | ||
| limit: int = Query(20, ge=1, le=100, description="Number of jobs to return"), | ||
| offset: int = Query(0, ge=0, description="Number of jobs to skip"), | ||
| status: str = Query(None, description="Filter by job status"), | ||
| job_type: str = Query(None, description="Filter by job type"), | ||
| priority: str = Query(None, description="Filter by job priority"), | ||
| current_user: User = Depends(current_active_user) | ||
| ): | ||
| """List jobs with pagination and filtering.""" | ||
| try: | ||
| # Build filters dict | ||
| filters = {} | ||
| if status: | ||
| filters["status"] = status | ||
| if job_type: | ||
| filters["job_type"] = job_type | ||
| if priority: | ||
| filters["priority"] = priority | ||
|
|
||
| queue = await get_simple_queue() | ||
| result = await queue.get_jobs(limit=limit, offset=offset, filters=filters) | ||
|
|
||
|
Comment on lines
+31
to
+41
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apply user scoping at the data source, not post-filtering. Post-filtering breaks pagination totals/has_more. Pass - queue = await get_simple_queue()
- result = await queue.get_jobs(limit=limit, offset=offset, filters=filters)
-
- # Filter jobs by user if not admin
- if not current_user.is_superuser:
- result["jobs"] = [
- job for job in result["jobs"]
- if job["user_id"] == str(current_user.user_id)
- ]
- result["pagination"]["total"] = len(result["jobs"])
+ queue = await get_simple_queue()
+ if not current_user.is_superuser:
+ filters["user_id"] = str(current_user.user_id)
+ result = await queue.get_jobs(limit=limit, offset=offset, filters=filters)Also applies to: 43-49 |
||
| # Filter jobs by user if not admin | ||
| if not current_user.is_superuser: | ||
| result["jobs"] = [ | ||
| job for job in result["jobs"] | ||
| if job["user_id"] == str(current_user.user_id) | ||
| ] | ||
| result["pagination"]["total"] = len(result["jobs"]) | ||
|
|
||
| return result | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to list jobs: {e}") | ||
| return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}} | ||
|
|
||
|
|
||
| @router.get("/stats") | ||
| async def get_queue_stats( | ||
| current_user: User = Depends(current_active_user) | ||
| ): | ||
| """Get queue statistics.""" | ||
| try: | ||
| queue = await get_simple_queue() | ||
| stats = await queue.get_job_stats() | ||
| return stats | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to get queue stats: {e}") | ||
| return {"queued": 0, "processing": 0, "completed": 0, "failed": 0} | ||
|
|
||
|
|
||
| @router.get("/health") | ||
| async def get_queue_health(): | ||
| """Get queue system health status.""" | ||
| try: | ||
| queue = await get_simple_queue() | ||
|
|
||
| return { | ||
| "status": "healthy" if queue.running else "stopped", | ||
| "worker_running": queue.running, | ||
| "message": "Simple queue is operational" if queue.running else "Simple queue worker not running" | ||
| } | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to get queue health: {e}") | ||
| return { | ||
| "status": "unhealthy", | ||
| "message": f"Health check failed: {str(e)}" | ||
| } | ||
|
|
||
|
|
||
| class FlushJobsRequest(BaseModel): | ||
| older_than_hours: int = 24 | ||
| statuses: Optional[List[str]] = None | ||
|
|
||
|
|
||
| class FlushAllJobsRequest(BaseModel): | ||
| confirm: bool = False | ||
|
|
||
|
|
||
| @router.post("/flush") | ||
| async def flush_inactive_jobs( | ||
| request: FlushJobsRequest, | ||
| current_user: User = Depends(current_active_user) | ||
| ): | ||
| """Flush inactive jobs from the database (admin only).""" | ||
| if not current_user.is_superuser: | ||
| raise HTTPException(status_code=403, detail="Admin access required") | ||
|
|
||
| try: | ||
| queue = await get_simple_queue() | ||
| result = await queue.flush_inactive_jobs( | ||
| older_than_hours=request.older_than_hours, | ||
| statuses=request.statuses | ||
| ) | ||
| return result | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to flush inactive jobs: {e}") | ||
| raise HTTPException(status_code=500, detail=f"Failed to flush jobs: {str(e)}") | ||
|
|
||
|
|
||
| @router.post("/flush-all") | ||
| async def flush_all_jobs( | ||
| request: FlushAllJobsRequest, | ||
| current_user: User = Depends(current_active_user) | ||
| ): | ||
| """Flush ALL jobs from the database (admin only). USE WITH EXTREME CAUTION!""" | ||
| if not current_user.is_superuser: | ||
| raise HTTPException(status_code=403, detail="Admin access required") | ||
|
|
||
| try: | ||
| if not request.confirm: | ||
| raise HTTPException( | ||
| status_code=400, | ||
| detail="Must set confirm=true to flush all jobs. This is a destructive operation." | ||
| ) | ||
|
|
||
| queue = await get_simple_queue() | ||
| result = await queue.flush_all_jobs(confirm=request.confirm) | ||
| return result | ||
|
|
||
| except ValueError as e: | ||
| raise HTTPException(status_code=400, detail=str(e)) | ||
| except Exception as e: | ||
| logger.error(f"Failed to flush all jobs: {e}") | ||
| raise HTTPException(status_code=500, detail=f"Failed to flush all jobs: {str(e)}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Type hints should be Optional[...] and status casing should be consistent.
= Nonemust beOptional[T]."completed".📝 Committable suggestion
🧰 Tools
🪛 Ruff (0.13.1)
745-745: PEP 484 prohibits implicit
OptionalConvert to
Optional[T](RUF013)
746-746: PEP 484 prohibits implicit
OptionalConvert to
Optional[T](RUF013)
747-747: PEP 484 prohibits implicit
OptionalConvert to
Optional[T](RUF013)
748-748: PEP 484 prohibits implicit
OptionalConvert to
Optional[T](RUF013)
749-749: PEP 484 prohibits implicit
OptionalConvert to
Optional[T](RUF013)