Conversation
- Updated the `CLAUDE.md` documentation to reflect changes in ASR service command. - Introduced a new method `load_audio_file_as_chunk` in `audio_utils.py` for loading audio files into the Wyoming AudioChunk format. - Enhanced `ProcessorManager` to include client type detection and improved cleanup of processing tasks. - Updated `conversation_controller.py` to queue transcription and memory processing jobs with better error handling. - Refactored the `Upload` component in the web UI to support a three-phase upload process with improved status management and polling for processing tasks. - Added new API methods for asynchronous file uploads and job status retrieval.
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds processor monitoring across backend and web UI: new admin API endpoints (overview, history, client detail), ProcessorManager analytics methods (pipeline stats, history, queue health), routes wiring, a new Processes page with health cards, pipeline view, active tasks, history list, client detail modal, and API client methods. System page drops processor/clients blocks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Admin
participant WebUI as Web UI (Processes)
participant API as Backend Router (/processor/…)
participant Ctrl as System Controller
participant PM as ProcessorManager
participant TM as TaskManager
participant CM as ClientManager
Admin->>WebUI: Open /processes (admin)
WebUI->>API: GET /processor/overview
API->>Ctrl: get_processor_overview()
Ctrl->>PM: get_pipeline_statistics()
Ctrl->>PM: get_queue_health_status()
Ctrl->>TM: health/status
Ctrl-->>WebUI: JSON {pipeline_stats, system_health, queue_health, recent_activity}
WebUI->>API: GET /processor/history?page=&per_page=
API->>Ctrl: get_processor_history(page, per_page)
Ctrl->>PM: get_processing_history(limit=1000)
Ctrl-->>WebUI: JSON {history, pagination}
Admin->>WebUI: View client details
WebUI->>API: GET /processor/clients/{client_id}
API->>Ctrl: get_client_processing_detail(client_id)
Ctrl->>PM: processing_status for client
Ctrl->>TM: client_tasks
Ctrl->>CM: get client info
Ctrl-->>WebUI: JSON {client_info, processing_status, active_tasks}
sequenceDiagram
autonumber
actor Admin
participant Page as Processes Page
participant Widgets as Dashboard Widgets
Admin->>Page: Load page
loop Auto-refresh (every 5s when enabled)
Page->>Page: fetch overview + history
Page->>Widgets: update SystemHealthCards, ProcessPipelineView, ActiveTasksTable, ProcessingHistory
end
Admin->>Widgets: Select client in ActiveTasksTable
Widgets->>Page: set selectedClientId
Page->>Widgets: Open ClientDetailModal
Widgets->>Page: Refresh/Close interactions
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (19)
backends/advanced/webui/src/components/layout/Layout.tsx (1)
2-2: Remove unused import.The
Activityicon is imported but only used in the navigation items array, which is fine. However, you should verify that all imported icons are actually being used.Also applies to: 19-19
backends/advanced/webui/src/pages/System.tsx (1)
147-149: Remove unusedformatDatefunction.The
formatDatefunction is declared but never used in the component.- const formatDate = (dateString: string) => { - return new Date(dateString).toLocaleString() - }backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1)
33-35: Consider extracting error rate threshold to a constant.The error rate threshold of 0.1 (10%) is hardcoded in multiple places. Consider extracting it to a named constant for better maintainability.
+const ERROR_RATE_THRESHOLD = 0.1 // 10% error rate threshold + export default function SystemHealthCards({ data }: SystemHealthCardsProps) { const cards = [ // ... { title: 'Error Rate', value: `${(data.error_rate * 100).toFixed(1)}%`, - icon: data.error_rate > 0.1 ? AlertTriangle : CheckCircle, - color: data.error_rate > 0.1 ? 'red' : 'green', + icon: data.error_rate > ERROR_RATE_THRESHOLD ? AlertTriangle : CheckCircle, + color: data.error_rate > ERROR_RATE_THRESHOLD ? 'red' : 'green', description: 'Recent processing error rate' },Also update line 123:
- data.error_rate > 0.1 ? 'bg-red-500' : 'bg-green-500' + data.error_rate > ERROR_RATE_THRESHOLD ? 'bg-red-500' : 'bg-green-500'backends/advanced/webui/src/pages/Processes.tsx (4)
2-2: Remove unused imports.Several icons are imported but never used in the component.
-import { Activity, RefreshCw, Users, Clock, BarChart3 } from 'lucide-react' +import { Activity, RefreshCw } from 'lucide-react'
48-67: Remove unused interface.The
ClientProcessingDetailinterface is defined but never used.-interface ClientProcessingDetail { - client_id: string - client_info: { - user_id: string - user_email: string - current_audio_uuid?: string - conversation_start_time?: string - sample_rate?: number - } - processing_status: any - active_tasks: Array<{ - task_id: string - task_name: string - task_type: string - created_at: string - completed_at?: string - error?: string - cancelled: boolean - }> -}
96-105: Consider adding cleanup on unmount.The auto-refresh interval should also be cleared when the component unmounts, not just when dependencies change.
useEffect(() => { + if (!isAdmin) return if (!autoRefresh) return const interval = setInterval(() => { loadProcessorOverview() }, 5000) // Refresh every 5 seconds return () => clearInterval(interval) }, [autoRefresh, isAdmin])
79-94: Consider adding more specific error handling.The error handling could differentiate between network errors and authorization issues for better user feedback.
} catch (err: any) { - setError(err.message || 'Failed to load processor overview') + if (err.response?.status === 403) { + setError('Insufficient permissions to view processor data') + } else if (err.code === 'ECONNABORTED') { + setError('Request timeout - processor data may be loading') + } else { + setError(err.message || 'Failed to load processor overview') + } } finally {backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx (2)
192-192: Fix: Potential division by zero when pipeline has no stages.The calculation assumes
pipelineStatshas at least one stage, but dividing byObject.keys(pipelineStats).lengthcould result in division by zero if the object is empty.Apply this fix to handle empty pipeline stats:
- {Math.round(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.success_rate, 0) / Object.keys(pipelineStats).length * 100)}% + {Object.keys(pipelineStats).length > 0 + ? Math.round(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.success_rate, 0) / Object.keys(pipelineStats).length * 100) + : 0}%
198-198: Consider formatting throughput values for better readability.Large throughput values would benefit from formatting (e.g., "1.2K/min" instead of "1200").
Consider adding a formatting helper:
+const formatThroughput = (value: number) => { + if (value >= 1000) return `${(value / 1000).toFixed(1)}K` + return value.toString() +} // In the JSX: - {Object.values(pipelineStats).reduce((sum, stage) => sum + stage.throughput_per_minute, 0)} + {formatThroughput(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.throughput_per_minute, 0))}backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1)
38-42: Consider using a more specific error type.Using
anyfor error catching reduces type safety. Consider defining a specific error type or usingError.- } catch (err: any) { - setError(err.message || 'Failed to load processing history') + } catch (err) { + const message = err instanceof Error ? err.message : 'Failed to load processing history' + setError(message)backends/advanced/src/advanced_omi_backend/processors.py (2)
466-466: Remove unused loop variable.Static analysis correctly identified that
client_idis not used within the loop body.- for client_id, state in self.processing_state.items(): + for _, state in self.processing_state.items():
522-524: Follow logging best practices for exception handling.Static analysis correctly suggests using
logging.exceptionfor better error tracking.except Exception as e: - logger.error(f"Error getting processing history: {e}") + logger.exception("Error getting processing history") return []backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx (2)
44-48: Handle API errors more gracefully.Using
anytype and generic error handling reduces type safety and debugging capability.- } catch (err: any) { - setError(err.message || 'Failed to load active tasks') + } catch (err) { + console.error('Failed to load active tasks:', err) + const message = err instanceof Error ? err.message : 'Failed to load active tasks' + setError(message)
121-125: Incorrect sort comparison logic.The sort comparison always returns 1 or -1 but never 0 for equal values, which can cause unstable sorting.
if (sortDirection === 'asc') { - return aValue > bValue ? 1 : -1 + return aValue > bValue ? 1 : aValue < bValue ? -1 : 0 } else { - return aValue < bValue ? 1 : -1 + return aValue < bValue ? 1 : aValue > bValue ? -1 : 0 }backends/advanced/webui/src/components/processes/ClientDetailModal.tsx (3)
50-54: Use consistent error handling pattern.Similar to other components, avoid using
anytype for errors.- } catch (err: any) { - setError(err.message || 'Failed to load client details') + } catch (err) { + const message = err instanceof Error ? err.message : 'Failed to load client details' + setError(message)
57-59: Missing cleanup for useEffect.The effect should handle cleanup to prevent setting state on unmounted components.
useEffect(() => { + let isMounted = true + + const load = async () => { + if (isMounted) { + await loadClientDetail() + } + } + - loadClientDetail() + load() + + return () => { + isMounted = false + } }, [clientId])
61-63: Consider memoizing formatTime function.This function is recreated on every render but doesn't depend on any state or props.
import { useCallback } from 'react' // Inside component: const formatTime = useCallback((timestamp: string) => { return new Date(timestamp).toLocaleString() }, [])backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2)
1142-1156: Guard missing memory service and log with stack traces
- If get_memory_service() returns None, this will raise on await; return 503 instead.
- Use logger.exception to retain stack traces.
- Prefer explicit conversion flag for exceptions.
async def delete_all_user_memories(user: User): """Delete all memories for the current user.""" try: from advanced_omi_backend.memory import get_memory_service - memory_service = get_memory_service() + memory_service = get_memory_service() + if memory_service is None: + return JSONResponse( + status_code=503, + content={"error": "Memory service unavailable"}, + ) # Delete all memories for the user - deleted_count = await memory_service.delete_all_user_memories(user.user_id) + deleted_count = await memory_service.delete_all_user_memories(user.user_id) logger.info(f"Deleted {deleted_count} memories for user {user.user_id}") return { "message": f"Successfully deleted {deleted_count} memories", "deleted_count": deleted_count, "user_id": user.user_id, "status": "success" } - except Exception as e: - logger.error(f"Error deleting all memories for user {user.user_id}: {e}") + except Exception as e: + logger.exception("Error deleting all memories for user %s", user.user_id) return JSONResponse( - status_code=500, content={"error": f"Failed to delete memories: {str(e)}"} + status_code=500, content={"error": f"Failed to delete memories: {e!s}"} )
1200-1229: Validate page/per_page and improve exception logging
- Add FastAPI constraints to prevent negative or zero values and unbounded page sizes.
- Keep logging with stack traces and explicit exception conversion.
-from fastapi import BackgroundTasks, File, Query, UploadFile +from fastapi import BackgroundTasks, File, Query, UploadFile @@ -async def get_processor_history(page: int = 1, per_page: int = 50): +async def get_processor_history( + page: int = Query(1, ge=1), + per_page: int = Query(50, ge=1, le=200), +): """Get paginated processing history.""" try: processor_manager = get_processor_manager() # Calculate offset offset = (page - 1) * per_page # Get full history and paginate full_history = processor_manager.get_processing_history(limit=1000) # Get more for pagination total_items = len(full_history) # Paginate paginated_history = full_history[offset:offset + per_page] return { "history": paginated_history, "pagination": { "page": page, "per_page": per_page, "total": total_items, "total_pages": (total_items + per_page - 1) // per_page } } except Exception as e: - logger.error(f"Error getting processor history: {e}") + logger.exception("Error getting processor history") return JSONResponse( - status_code=500, content={"error": f"Failed to get processor history: {str(e)}"} + status_code=500, content={"error": f"Failed to get processor history: {e!s}"} )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py(1 hunks)backends/advanced/src/advanced_omi_backend/processors.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py(1 hunks)backends/advanced/webui/src/App.tsx(2 hunks)backends/advanced/webui/src/components/layout/Layout.tsx(2 hunks)backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx(1 hunks)backends/advanced/webui/src/components/processes/ClientDetailModal.tsx(1 hunks)backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx(1 hunks)backends/advanced/webui/src/components/processes/ProcessingHistory.tsx(1 hunks)backends/advanced/webui/src/components/processes/SystemHealthCards.tsx(1 hunks)backends/advanced/webui/src/pages/Processes.tsx(1 hunks)backends/advanced/webui/src/pages/System.tsx(2 hunks)backends/advanced/webui/src/services/api.ts(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Integration Tests
backends/advanced/webui/src/pages/Processes.tsx
[error] 2-2: TypeScript: 'Users' is declared but its value is never read.
[error] 2-2: TypeScript: 'Clock' is declared but its value is never read.
[error] 2-2: TypeScript: 'BarChart3' is declared but its value is never read.
[error] 48-48: TypeScript: 'ClientProcessingDetail' is declared but its value is never used.
backends/advanced/webui/src/pages/System.tsx
[error] 147-147: TypeScript: 'formatDate' is declared but its value is never read.
🪛 Ruff (0.13.0)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
1150-1155: Consider moving this statement to an else block
(TRY300)
1157-1157: Do not catch blind exception: Exception
(BLE001)
1158-1158: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1160-1160: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1193-1193: Consider moving this statement to an else block
(TRY300)
1194-1194: Do not catch blind exception: Exception
(BLE001)
1195-1195: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1197-1197: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1224-1224: Do not catch blind exception: Exception
(BLE001)
1225-1225: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1227-1227: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1279-1279: Consider moving this statement to an else block
(TRY300)
1280-1280: Do not catch blind exception: Exception
(BLE001)
1281-1281: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1283-1283: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/processors.py
466-466: Loop control variable client_id not used within loop body
Rename unused client_id to _client_id
(B007)
522-522: Do not catch blind exception: Exception
(BLE001)
523-523: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py
172-172: Unused function argument: current_user
(ARG001)
172-172: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
180-180: Unused function argument: current_user
(ARG001)
180-180: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
188-188: Unused function argument: current_user
(ARG001)
188-188: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🔇 Additional comments (12)
backends/advanced/webui/src/components/layout/Layout.tsx (1)
17-21: LGTM! Navigation item properly gated by admin access.The new Processes navigation item is correctly placed within the admin-only section and follows the existing pattern.
backends/advanced/webui/src/pages/System.tsx (1)
61-65: LGTM! Clean removal of processor status fetching.The removal of processor status and active clients from the System page aligns well with the new dedicated Processes page for monitoring.
backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1)
106-115: LGTM! Clean conditional rendering for task manager health.The health indicator for the Processing Tasks card is well-implemented with clear visual feedback.
backends/advanced/webui/src/App.tsx (1)
72-76: LGTM! Route properly wrapped in error boundary.The new
/processesroute follows the established pattern with PageErrorBoundary wrapping.backends/advanced/webui/src/services/api.ts (2)
136-140: LGTM! Well-structured API endpoints with sensible defaults.The new processor monitoring endpoints are properly integrated into the systemApi object with appropriate default pagination values.
137-138: Resolved — frontend perPage default aligns with backend (default 50, max 100).
Backend definesper_page: Query(50, ge=1, le=100)in backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py:179; no change required.backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (2)
172-174: Consistent admin-only endpoint implementation.The three new processor monitoring endpoints properly enforce admin access via
current_superuserdependency and follow the existing patterns in the codebase.Also applies to: 180-183, 188-191
178-179: LGTM! Proper input validation for pagination.The pagination parameters include appropriate constraints with
ge=1for page number andge=1, le=100for items per page.backends/advanced/webui/src/pages/Processes.tsx (1)
144-152: LGTM! Clean auto-refresh toggle implementation.The auto-refresh toggle with its checkbox control provides good user control over data updates.
backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1)
52-55: Verify useEffect deps for initialData to avoid missed/extra fetches.
rg failed with "unrecognized file type: tsx" — unable to verify usage. Inspect backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (lines 52–55).
If initialData can change, replace the empty deps with [initialData.length, loadHistory], e.g.:
useEffect(() => {
if (initialData.length === 0) loadHistory(1)
}, [initialData.length, loadHistory])
If initialData is stable on mount, keep [] but document that assumption.backends/advanced/src/advanced_omi_backend/processors.py (1)
505-505: Incorrect — TaskManager already provides completed_tasks; no defensive getattr needed.TaskManager declares self.completed_tasks (backends/advanced/src/advanced_omi_backend/task_manager.py:35) and processors set self.task_manager = get_task_manager() (backends/advanced/src/advanced_omi_backend/processors.py:119), so the access at processors.py:505 is valid.
Likely an incorrect or invalid review comment.
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (1)
1230-1284: Tighten null‑guards & normalize task fields in get_client_processing_detail
- Verified: TaskInfo dataclass and BackgroundTaskManager.get_tasks_for_client(...) exist; ClientState exposes user_email, current_audio_uuid, conversation_start_time, sample_rate.
- Apply the proposed safe changes from the diff: default processing_status to {}, guard task_manager before calling get_tasks_for_client (or keep if get_task_manager can be None), ensure client_tasks is a list, use getattr(task,"metadata",{}) when reading metadata, prefer stable task identifiers (task.id / task.task_id / fallback to task.name), convert timestamps with a resilient _to_iso helper (int/float/datetime/None), and use logger.exception + {e!s} in the 500 response.
- Outstanding: quick scan did not locate a definition of get_task_manager — confirm whether it can ever return None; if it always returns a manager instance the explicit task_manager None-guard can be removed.
| async def get_processor_overview(): | ||
| """Get comprehensive processor overview with pipeline stats.""" | ||
| try: | ||
| processor_manager = get_processor_manager() | ||
| task_manager = get_task_manager() | ||
|
|
||
| # Get pipeline statistics | ||
| pipeline_stats = processor_manager.get_pipeline_statistics() | ||
|
|
||
| # Get system health metrics | ||
| task_health = task_manager.get_health_status() | ||
| queue_health = processor_manager.get_queue_health_status() | ||
|
|
||
| # Get recent activity | ||
| recent_activity = processor_manager.get_processing_history(limit=10) | ||
|
|
||
| overview = { | ||
| "pipeline_stats": pipeline_stats, | ||
| "system_health": { | ||
| "total_active_clients": len(processor_manager.active_file_sinks), | ||
| "total_processing_tasks": len(processor_manager.processing_tasks), | ||
| "task_manager_healthy": task_health.get("healthy", False), | ||
| "error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1), | ||
| "uptime_hours": time.time() / 3600 # Placeholder | ||
| }, | ||
| "queue_health": queue_health, | ||
| "recent_activity": recent_activity[:5] # Last 5 activities | ||
| } | ||
|
|
||
| return overview | ||
| except Exception as e: | ||
| logger.error(f"Error getting processor overview: {e}") | ||
| return JSONResponse( | ||
| status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"} | ||
| ) | ||
|
|
There was a problem hiding this comment.
Harden overview when task_manager is absent; fix uptime and minor inefficiencies
- get_task_manager() may be None; calling get_health_status unguarded will 500.
- Compute uptime from a real start_time if available; avoid epoch/3600 placeholder.
- Ask processor_manager for only 5 activities instead of fetching 10 then slicing.
- Use logger.exception and explicit conversion flag.
async def get_processor_overview():
"""Get comprehensive processor overview with pipeline stats."""
try:
processor_manager = get_processor_manager()
- task_manager = get_task_manager()
+ task_manager = get_task_manager()
# Get pipeline statistics
pipeline_stats = processor_manager.get_pipeline_statistics()
# Get system health metrics
- task_health = task_manager.get_health_status()
- queue_health = processor_manager.get_queue_health_status()
+ task_health = task_manager.get_health_status() if task_manager else {}
+ queue_health = processor_manager.get_queue_health_status()
# Get recent activity
- recent_activity = processor_manager.get_processing_history(limit=10)
+ recent_activity = processor_manager.get_processing_history(limit=5)
+ # Derive safe error rate
+ recent_errors = task_health.get("recent_errors", 0) or 0
+ completed_tasks = task_health.get("completed_tasks", 0) or 0
+ error_rate = (recent_errors / completed_tasks) if completed_tasks > 0 else 0.0
+
+ # Uptime: prefer manager's start_time if provided
+ start_time = getattr(processor_manager, "start_time", None)
+ uptime_hours = ((time.time() - start_time) / 3600) if start_time else None
+
overview = {
"pipeline_stats": pipeline_stats,
"system_health": {
"total_active_clients": len(processor_manager.active_file_sinks),
"total_processing_tasks": len(processor_manager.processing_tasks),
- "task_manager_healthy": task_health.get("healthy", False),
- "error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1),
- "uptime_hours": time.time() / 3600 # Placeholder
+ "task_manager_healthy": task_health.get("healthy", False) if task_manager else False,
+ "error_rate": error_rate,
+ "uptime_hours": uptime_hours
},
"queue_health": queue_health,
- "recent_activity": recent_activity[:5] # Last 5 activities
+ "recent_activity": recent_activity # Last 5 activities
}
return overview
except Exception as e:
- logger.error(f"Error getting processor overview: {e}")
+ logger.exception("Error getting processor overview")
return JSONResponse(
- status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"}
+ status_code=500, content={"error": f"Failed to get processor overview: {e!s}"}
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def get_processor_overview(): | |
| """Get comprehensive processor overview with pipeline stats.""" | |
| try: | |
| processor_manager = get_processor_manager() | |
| task_manager = get_task_manager() | |
| # Get pipeline statistics | |
| pipeline_stats = processor_manager.get_pipeline_statistics() | |
| # Get system health metrics | |
| task_health = task_manager.get_health_status() | |
| queue_health = processor_manager.get_queue_health_status() | |
| # Get recent activity | |
| recent_activity = processor_manager.get_processing_history(limit=10) | |
| overview = { | |
| "pipeline_stats": pipeline_stats, | |
| "system_health": { | |
| "total_active_clients": len(processor_manager.active_file_sinks), | |
| "total_processing_tasks": len(processor_manager.processing_tasks), | |
| "task_manager_healthy": task_health.get("healthy", False), | |
| "error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1), | |
| "uptime_hours": time.time() / 3600 # Placeholder | |
| }, | |
| "queue_health": queue_health, | |
| "recent_activity": recent_activity[:5] # Last 5 activities | |
| } | |
| return overview | |
| except Exception as e: | |
| logger.error(f"Error getting processor overview: {e}") | |
| return JSONResponse( | |
| status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"} | |
| ) | |
| async def get_processor_overview(): | |
| """Get comprehensive processor overview with pipeline stats.""" | |
| try: | |
| processor_manager = get_processor_manager() | |
| task_manager = get_task_manager() | |
| # Get pipeline statistics | |
| pipeline_stats = processor_manager.get_pipeline_statistics() | |
| # Get system health metrics | |
| task_health = task_manager.get_health_status() if task_manager else {} | |
| queue_health = processor_manager.get_queue_health_status() | |
| # Get recent activity | |
| recent_activity = processor_manager.get_processing_history(limit=5) | |
| # Derive safe error rate | |
| recent_errors = task_health.get("recent_errors", 0) or 0 | |
| completed_tasks = task_health.get("completed_tasks", 0) or 0 | |
| error_rate = (recent_errors / completed_tasks) if completed_tasks > 0 else 0.0 | |
| # Uptime: prefer manager's start_time if provided | |
| start_time = getattr(processor_manager, "start_time", None) | |
| uptime_hours = ((time.time() - start_time) / 3600) if start_time else None | |
| overview = { | |
| "pipeline_stats": pipeline_stats, | |
| "system_health": { | |
| "total_active_clients": len(processor_manager.active_file_sinks), | |
| "total_processing_tasks": len(processor_manager.processing_tasks), | |
| "task_manager_healthy": task_health.get("healthy", False) if task_manager else False, | |
| "error_rate": error_rate, | |
| "uptime_hours": uptime_hours | |
| }, | |
| "queue_health": queue_health, | |
| "recent_activity": recent_activity | |
| } | |
| return overview | |
| except Exception as e: | |
| logger.exception("Error getting processor overview") | |
| return JSONResponse( | |
| status_code=500, content={"error": f"Failed to get processor overview: {e!s}"} | |
| ) |
🧰 Tools
🪛 Ruff (0.13.0)
1193-1193: Consider moving this statement to an else block
(TRY300)
1194-1194: Do not catch blind exception: Exception
(BLE001)
1195-1195: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1197-1197: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
around lines 1164-1199, harden get_processor_overview by guarding against a
missing task_manager (only call task_manager.get_health_status() if task_manager
is not None, otherwise use safe defaults), compute uptime_hours from a real
start_time when available (e.g., use (time.time() -
processor_manager.start_time)/3600 if processor_manager.start_time exists,
otherwise fallback to (time.time()/3600)), request only 5 recent activities from
processor_manager.get_processing_history(limit=5) instead of fetching 10 then
slicing, compute error_rate with a safe denominator to avoid division by zero,
and replace logger.error with logger.exception while returning the JSONResponse
with an explicit string conversion for the error (e.g., content={"error":
str(e)}).
| "avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0, | ||
| "success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1), |
There was a problem hiding this comment.
Fix incorrect calculation of average processing time.
The mean calculation uses raw duration values but the metadata stores processing_time (not duration). Also, the success rate calculation doesn't validate if tasks actually succeeded.
- "avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
- "success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
+ "avg_processing_time_ms": mean([t.get("duration", 0) * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
+ "success_rate": len([t for t in audio_tasks[-100:] if t and not t.get("error")]) / max(len(audio_tasks[-100:]), 1),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0, | |
| "success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1), | |
| "avg_processing_time_ms": mean([t.get("processing_time", 0) * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0, | |
| "success_rate": len([t for t in audio_tasks[-100:] if t and not t.get("error")]) / max(len(audio_tasks[-100:]), 1), |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py around lines
478-479, the avg_processing_time_ms calculation is using a nonexistent
"duration" field and should use the stored "processing_time" (converted to
milliseconds), and the success_rate should explicitly count tasks that indicate
success (e.g., t.get("success") is truthy or t.get("status") == "success")
rather than treating any task object as a success; update the mean calculation
to use t["processing_time"] * 1000 (guarding for missing values) and compute
success_rate as count_of_successful_tasks / max(total_tasks, 1) over the same
slice.
| "queue_size": queue.qsize() if queue else 0, | ||
| "active_tasks": len([tid for tid, tinfo in self.processing_tasks.items() | ||
| if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]), | ||
| "avg_processing_time_ms": 30000, # Placeholder - can be calculated from task manager history |
There was a problem hiding this comment.
Fix active tasks calculation logic error.
Line 490-491: The logic for checking active tasks is incorrect - it's trying to iterate over processing_tasks items but treating them as if the key contains the stage name, which doesn't match the data structure where stages are nested within client tasks.
- "active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
- if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
+ "active_tasks": sum(1 for client_tasks in self.processing_tasks.values()
+ if stage in client_tasks and
+ not self.task_manager.get_task_info(client_tasks.get(stage, "")).completed_at
+ if self.task_manager.get_task_info(client_tasks.get(stage, "")) else False),Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py around lines 489 to
492, the active tasks count is wrong because it assumes processing_tasks keys
contain the stage name; instead iterate the processing_tasks values (each
client/task info), extract the stage-specific task id from that nested structure
(e.g. tinfo.get(stage) or similar), skip if missing, then look up that stage
task via self.task_manager.get_task_info(...) and count it as active only when
that lookup exists and its completed_at is falsy; update the comprehension/loop
accordingly and guard against missing task_info to avoid attribute errors.
Summary by CodeRabbit
New Features
Refactor