Skip to content

Processor view#112

Closed
AnkushMalaker wants to merge 7 commits intomainfrom
processor-view
Closed

Processor view#112
AnkushMalaker wants to merge 7 commits intomainfrom
processor-view

Conversation

@AnkushMalaker
Copy link
Collaborator

@AnkushMalaker AnkushMalaker commented Sep 19, 2025

Summary by CodeRabbit

  • New Features

    • New admin-only Processes page for real-time monitoring: system health cards, pipeline visualization, searchable/sortable active tasks, paginated processing history, and per-client detail modal.
    • Auto-refresh and manual refresh controls; new navigation item “Processes” under Admin.
  • Refactor

    • System page simplified by removing Processor Status and Active Clients; monitoring consolidated into the new Processes page.

- Updated the `CLAUDE.md` documentation to reflect changes in ASR service command.
- Introduced a new method `load_audio_file_as_chunk` in `audio_utils.py` for loading audio files into the Wyoming AudioChunk format.
- Enhanced `ProcessorManager` to include client type detection and improved cleanup of processing tasks.
- Updated `conversation_controller.py` to queue transcription and memory processing jobs with better error handling.
- Refactored the `Upload` component in the web UI to support a three-phase upload process with improved status management and polling for processing tasks.
- Added new API methods for asynchronous file uploads and job status retrieval.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 19, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds processor monitoring across backend and web UI: new admin API endpoints (overview, history, client detail), ProcessorManager analytics methods (pipeline stats, history, queue health), routes wiring, a new Processes page with health cards, pipeline view, active tasks, history list, client detail modal, and API client methods. System page drops processor/clients blocks.

Changes

Cohort / File(s) Summary
Backend controllers & routes
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py, backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py
Adds async endpoints: processor overview, paginated history, and per-client processing detail; wires admin-only GET routes with validation.
ProcessorManager analytics
backends/advanced/src/advanced_omi_backend/processors.py
Introduces get_pipeline_statistics, get_processing_history, and get_queue_health_status without altering processing logic.
Web UI API client
backends/advanced/webui/src/services/api.ts
Adds systemApi methods: getProcessorOverview, getProcessorHistory(page, perPage), getClientProcessingDetail(clientId).
Navigation & routing
backends/advanced/webui/src/App.tsx, backends/advanced/webui/src/components/layout/Layout.tsx
Adds /processes route and admin-only nav item with Activity icon.
Processes page & widgets (new)
backends/advanced/webui/src/pages/Processes.tsx, backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx, .../SystemHealthCards.tsx, .../ActiveTasksTable.tsx, .../ProcessingHistory.tsx, .../ClientDetailModal.tsx
Implements Processes dashboard: health cards, pipeline view, active tasks table, history list, and client detail modal with refresh/auto-refresh.
System page cleanup
backends/advanced/webui/src/pages/System.tsx
Removes Processor Status and Active Clients fetching, state, and UI; keeps other sections intact.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin
  participant WebUI as Web UI (Processes)
  participant API as Backend Router (/processor/…)
  participant Ctrl as System Controller
  participant PM as ProcessorManager
  participant TM as TaskManager
  participant CM as ClientManager

  Admin->>WebUI: Open /processes (admin)
  WebUI->>API: GET /processor/overview
  API->>Ctrl: get_processor_overview()
  Ctrl->>PM: get_pipeline_statistics()
  Ctrl->>PM: get_queue_health_status()
  Ctrl->>TM: health/status
  Ctrl-->>WebUI: JSON {pipeline_stats, system_health, queue_health, recent_activity}

  WebUI->>API: GET /processor/history?page=&per_page=
  API->>Ctrl: get_processor_history(page, per_page)
  Ctrl->>PM: get_processing_history(limit=1000)
  Ctrl-->>WebUI: JSON {history, pagination}

  Admin->>WebUI: View client details
  WebUI->>API: GET /processor/clients/{client_id}
  API->>Ctrl: get_client_processing_detail(client_id)
  Ctrl->>PM: processing_status for client
  Ctrl->>TM: client_tasks
  Ctrl->>CM: get client info
  Ctrl-->>WebUI: JSON {client_info, processing_status, active_tasks}
Loading
sequenceDiagram
  autonumber
  actor Admin
  participant Page as Processes Page
  participant Widgets as Dashboard Widgets

  Admin->>Page: Load page
  loop Auto-refresh (every 5s when enabled)
    Page->>Page: fetch overview + history
    Page->>Widgets: update SystemHealthCards, ProcessPipelineView, ActiveTasksTable, ProcessingHistory
  end
  Admin->>Widgets: Select client in ActiveTasksTable
  Widgets->>Page: set selectedClientId
  Page->>Widgets: Open ClientDetailModal
  Widgets->>Page: Refresh/Close interactions
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Better audio processing #111 — Also modifies ProcessorManager around cropping/memory flows; related to this PR’s new ProcessorManager analytics surface.
  • Openmemory mcp #69 — Adjusts memory handling in ProcessorManager; overlaps with the class extended here for stats/history.

Poem

A rabbit peers at queues in line,
Counts the ticks of processing time.
Pipelines hum, the dashboards glow,
Clients whisper where their tasks now go.
With gentle paws I press “Refresh” —
History scrolls, the lights stay fresh.
Hop, inspect, and off we go! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 59.09% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Processor view" is concise and directly reflects the primary change in the changeset — adding a processor/process monitoring UI and supporting backend endpoints, routes, and APIs. It clearly communicates the main intent to reviewers and is neither misleading nor unrelated to the files changed. The phrasing is short and suitable for a teammate scanning PR history.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 19, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (19)
backends/advanced/webui/src/components/layout/Layout.tsx (1)

2-2: Remove unused import.

The Activity icon is imported but only used in the navigation items array, which is fine. However, you should verify that all imported icons are actually being used.

Also applies to: 19-19

backends/advanced/webui/src/pages/System.tsx (1)

147-149: Remove unused formatDate function.

The formatDate function is declared but never used in the component.

-  const formatDate = (dateString: string) => {
-    return new Date(dateString).toLocaleString()
-  }
backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1)

33-35: Consider extracting error rate threshold to a constant.

The error rate threshold of 0.1 (10%) is hardcoded in multiple places. Consider extracting it to a named constant for better maintainability.

+const ERROR_RATE_THRESHOLD = 0.1 // 10% error rate threshold
+
 export default function SystemHealthCards({ data }: SystemHealthCardsProps) {
   const cards = [
     // ...
     {
       title: 'Error Rate',
       value: `${(data.error_rate * 100).toFixed(1)}%`,
-      icon: data.error_rate > 0.1 ? AlertTriangle : CheckCircle,
-      color: data.error_rate > 0.1 ? 'red' : 'green',
+      icon: data.error_rate > ERROR_RATE_THRESHOLD ? AlertTriangle : CheckCircle,
+      color: data.error_rate > ERROR_RATE_THRESHOLD ? 'red' : 'green',
       description: 'Recent processing error rate'
     },

Also update line 123:

-                      data.error_rate > 0.1 ? 'bg-red-500' : 'bg-green-500'
+                      data.error_rate > ERROR_RATE_THRESHOLD ? 'bg-red-500' : 'bg-green-500'
backends/advanced/webui/src/pages/Processes.tsx (4)

2-2: Remove unused imports.

Several icons are imported but never used in the component.

-import { Activity, RefreshCw, Users, Clock, BarChart3 } from 'lucide-react'
+import { Activity, RefreshCw } from 'lucide-react'

48-67: Remove unused interface.

The ClientProcessingDetail interface is defined but never used.

-interface ClientProcessingDetail {
-  client_id: string
-  client_info: {
-    user_id: string
-    user_email: string
-    current_audio_uuid?: string
-    conversation_start_time?: string
-    sample_rate?: number
-  }
-  processing_status: any
-  active_tasks: Array<{
-    task_id: string
-    task_name: string
-    task_type: string
-    created_at: string
-    completed_at?: string
-    error?: string
-    cancelled: boolean
-  }>
-}

96-105: Consider adding cleanup on unmount.

The auto-refresh interval should also be cleared when the component unmounts, not just when dependencies change.

   useEffect(() => {
+    if (!isAdmin) return
     if (!autoRefresh) return
 
     const interval = setInterval(() => {
       loadProcessorOverview()
     }, 5000) // Refresh every 5 seconds
 
     return () => clearInterval(interval)
   }, [autoRefresh, isAdmin])

79-94: Consider adding more specific error handling.

The error handling could differentiate between network errors and authorization issues for better user feedback.

     } catch (err: any) {
-      setError(err.message || 'Failed to load processor overview')
+      if (err.response?.status === 403) {
+        setError('Insufficient permissions to view processor data')
+      } else if (err.code === 'ECONNABORTED') {
+        setError('Request timeout - processor data may be loading')
+      } else {
+        setError(err.message || 'Failed to load processor overview')
+      }
     } finally {
backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx (2)

192-192: Fix: Potential division by zero when pipeline has no stages.

The calculation assumes pipelineStats has at least one stage, but dividing by Object.keys(pipelineStats).length could result in division by zero if the object is empty.

Apply this fix to handle empty pipeline stats:

-              {Math.round(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.success_rate, 0) / Object.keys(pipelineStats).length * 100)}%
+              {Object.keys(pipelineStats).length > 0 
+                ? Math.round(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.success_rate, 0) / Object.keys(pipelineStats).length * 100) 
+                : 0}%

198-198: Consider formatting throughput values for better readability.

Large throughput values would benefit from formatting (e.g., "1.2K/min" instead of "1200").

Consider adding a formatting helper:

+const formatThroughput = (value: number) => {
+  if (value >= 1000) return `${(value / 1000).toFixed(1)}K`
+  return value.toString()
+}

 // In the JSX:
-              {Object.values(pipelineStats).reduce((sum, stage) => sum + stage.throughput_per_minute, 0)}
+              {formatThroughput(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.throughput_per_minute, 0))}
backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1)

38-42: Consider using a more specific error type.

Using any for error catching reduces type safety. Consider defining a specific error type or using Error.

-    } catch (err: any) {
-      setError(err.message || 'Failed to load processing history')
+    } catch (err) {
+      const message = err instanceof Error ? err.message : 'Failed to load processing history'
+      setError(message)
backends/advanced/src/advanced_omi_backend/processors.py (2)

466-466: Remove unused loop variable.

Static analysis correctly identified that client_id is not used within the loop body.

-        for client_id, state in self.processing_state.items():
+        for _, state in self.processing_state.items():

522-524: Follow logging best practices for exception handling.

Static analysis correctly suggests using logging.exception for better error tracking.

         except Exception as e:
-            logger.error(f"Error getting processing history: {e}")
+            logger.exception("Error getting processing history")
             return []
backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx (2)

44-48: Handle API errors more gracefully.

Using any type and generic error handling reduces type safety and debugging capability.

-    } catch (err: any) {
-      setError(err.message || 'Failed to load active tasks')
+    } catch (err) {
+      console.error('Failed to load active tasks:', err)
+      const message = err instanceof Error ? err.message : 'Failed to load active tasks'
+      setError(message)

121-125: Incorrect sort comparison logic.

The sort comparison always returns 1 or -1 but never 0 for equal values, which can cause unstable sorting.

     if (sortDirection === 'asc') {
-      return aValue > bValue ? 1 : -1
+      return aValue > bValue ? 1 : aValue < bValue ? -1 : 0
     } else {
-      return aValue < bValue ? 1 : -1
+      return aValue < bValue ? 1 : aValue > bValue ? -1 : 0
     }
backends/advanced/webui/src/components/processes/ClientDetailModal.tsx (3)

50-54: Use consistent error handling pattern.

Similar to other components, avoid using any type for errors.

-    } catch (err: any) {
-      setError(err.message || 'Failed to load client details')
+    } catch (err) {
+      const message = err instanceof Error ? err.message : 'Failed to load client details'
+      setError(message)

57-59: Missing cleanup for useEffect.

The effect should handle cleanup to prevent setting state on unmounted components.

   useEffect(() => {
+    let isMounted = true
+    
+    const load = async () => {
+      if (isMounted) {
+        await loadClientDetail()
+      }
+    }
+    
-    loadClientDetail()
+    load()
+    
+    return () => {
+      isMounted = false
+    }
   }, [clientId])

61-63: Consider memoizing formatTime function.

This function is recreated on every render but doesn't depend on any state or props.

import { useCallback } from 'react'

// Inside component:
const formatTime = useCallback((timestamp: string) => {
  return new Date(timestamp).toLocaleString()
}, [])
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2)

1142-1156: Guard missing memory service and log with stack traces

  • If get_memory_service() returns None, this will raise on await; return 503 instead.
  • Use logger.exception to retain stack traces.
  • Prefer explicit conversion flag for exceptions.
 async def delete_all_user_memories(user: User):
     """Delete all memories for the current user."""
     try:
         from advanced_omi_backend.memory import get_memory_service
 
-        memory_service = get_memory_service()
+        memory_service = get_memory_service()
+        if memory_service is None:
+            return JSONResponse(
+                status_code=503,
+                content={"error": "Memory service unavailable"},
+            )
 
         # Delete all memories for the user
-        deleted_count = await memory_service.delete_all_user_memories(user.user_id)
+        deleted_count = await memory_service.delete_all_user_memories(user.user_id)
 
         logger.info(f"Deleted {deleted_count} memories for user {user.user_id}")
 
         return {
             "message": f"Successfully deleted {deleted_count} memories",
             "deleted_count": deleted_count,
             "user_id": user.user_id,
             "status": "success"
         }
 
-    except Exception as e:
-        logger.error(f"Error deleting all memories for user {user.user_id}: {e}")
+    except Exception as e:
+        logger.exception("Error deleting all memories for user %s", user.user_id)
         return JSONResponse(
-            status_code=500, content={"error": f"Failed to delete memories: {str(e)}"}
+            status_code=500, content={"error": f"Failed to delete memories: {e!s}"}
         )

1200-1229: Validate page/per_page and improve exception logging

  • Add FastAPI constraints to prevent negative or zero values and unbounded page sizes.
  • Keep logging with stack traces and explicit exception conversion.
-from fastapi import BackgroundTasks, File, Query, UploadFile
+from fastapi import BackgroundTasks, File, Query, UploadFile
@@
-async def get_processor_history(page: int = 1, per_page: int = 50):
+async def get_processor_history(
+    page: int = Query(1, ge=1),
+    per_page: int = Query(50, ge=1, le=200),
+):
     """Get paginated processing history."""
     try:
         processor_manager = get_processor_manager()
 
         # Calculate offset
         offset = (page - 1) * per_page
 
         # Get full history and paginate
         full_history = processor_manager.get_processing_history(limit=1000)  # Get more for pagination
         total_items = len(full_history)
 
         # Paginate
         paginated_history = full_history[offset:offset + per_page]
 
         return {
             "history": paginated_history,
             "pagination": {
                 "page": page,
                 "per_page": per_page,
                 "total": total_items,
                 "total_pages": (total_items + per_page - 1) // per_page
             }
         }
     except Exception as e:
-        logger.error(f"Error getting processor history: {e}")
+        logger.exception("Error getting processor history")
         return JSONResponse(
-            status_code=500, content={"error": f"Failed to get processor history: {str(e)}"}
+            status_code=500, content={"error": f"Failed to get processor history: {e!s}"}
         )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 641b7a6 and 4daf3cd.

📒 Files selected for processing (13)
  • backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/processors.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (1 hunks)
  • backends/advanced/webui/src/App.tsx (2 hunks)
  • backends/advanced/webui/src/components/layout/Layout.tsx (2 hunks)
  • backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ClientDetailModal.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1 hunks)
  • backends/advanced/webui/src/pages/Processes.tsx (1 hunks)
  • backends/advanced/webui/src/pages/System.tsx (2 hunks)
  • backends/advanced/webui/src/services/api.ts (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Integration Tests
backends/advanced/webui/src/pages/Processes.tsx

[error] 2-2: TypeScript: 'Users' is declared but its value is never read.


[error] 2-2: TypeScript: 'Clock' is declared but its value is never read.


[error] 2-2: TypeScript: 'BarChart3' is declared but its value is never read.


[error] 48-48: TypeScript: 'ClientProcessingDetail' is declared but its value is never used.

backends/advanced/webui/src/pages/System.tsx

[error] 147-147: TypeScript: 'formatDate' is declared but its value is never read.

🪛 Ruff (0.13.0)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py

1150-1155: Consider moving this statement to an else block

(TRY300)


1157-1157: Do not catch blind exception: Exception

(BLE001)


1158-1158: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1160-1160: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1193-1193: Consider moving this statement to an else block

(TRY300)


1194-1194: Do not catch blind exception: Exception

(BLE001)


1195-1195: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1197-1197: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1224-1224: Do not catch blind exception: Exception

(BLE001)


1225-1225: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1227-1227: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1279-1279: Consider moving this statement to an else block

(TRY300)


1280-1280: Do not catch blind exception: Exception

(BLE001)


1281-1281: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1283-1283: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/src/advanced_omi_backend/processors.py

466-466: Loop control variable client_id not used within loop body

Rename unused client_id to _client_id

(B007)


522-522: Do not catch blind exception: Exception

(BLE001)


523-523: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py

172-172: Unused function argument: current_user

(ARG001)


172-172: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


180-180: Unused function argument: current_user

(ARG001)


180-180: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


188-188: Unused function argument: current_user

(ARG001)


188-188: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🔇 Additional comments (12)
backends/advanced/webui/src/components/layout/Layout.tsx (1)

17-21: LGTM! Navigation item properly gated by admin access.

The new Processes navigation item is correctly placed within the admin-only section and follows the existing pattern.

backends/advanced/webui/src/pages/System.tsx (1)

61-65: LGTM! Clean removal of processor status fetching.

The removal of processor status and active clients from the System page aligns well with the new dedicated Processes page for monitoring.

backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1)

106-115: LGTM! Clean conditional rendering for task manager health.

The health indicator for the Processing Tasks card is well-implemented with clear visual feedback.

backends/advanced/webui/src/App.tsx (1)

72-76: LGTM! Route properly wrapped in error boundary.

The new /processes route follows the established pattern with PageErrorBoundary wrapping.

backends/advanced/webui/src/services/api.ts (2)

136-140: LGTM! Well-structured API endpoints with sensible defaults.

The new processor monitoring endpoints are properly integrated into the systemApi object with appropriate default pagination values.


137-138: Resolved — frontend perPage default aligns with backend (default 50, max 100).
Backend defines per_page: Query(50, ge=1, le=100) in backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py:179; no change required.

backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (2)

172-174: Consistent admin-only endpoint implementation.

The three new processor monitoring endpoints properly enforce admin access via current_superuser dependency and follow the existing patterns in the codebase.

Also applies to: 180-183, 188-191


178-179: LGTM! Proper input validation for pagination.

The pagination parameters include appropriate constraints with ge=1 for page number and ge=1, le=100 for items per page.

backends/advanced/webui/src/pages/Processes.tsx (1)

144-152: LGTM! Clean auto-refresh toggle implementation.

The auto-refresh toggle with its checkbox control provides good user control over data updates.

backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1)

52-55: Verify useEffect deps for initialData to avoid missed/extra fetches.
rg failed with "unrecognized file type: tsx" — unable to verify usage. Inspect backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (lines 52–55).
If initialData can change, replace the empty deps with [initialData.length, loadHistory], e.g.:
useEffect(() => {
if (initialData.length === 0) loadHistory(1)
}, [initialData.length, loadHistory])
If initialData is stable on mount, keep [] but document that assumption.

backends/advanced/src/advanced_omi_backend/processors.py (1)

505-505: Incorrect — TaskManager already provides completed_tasks; no defensive getattr needed.

TaskManager declares self.completed_tasks (backends/advanced/src/advanced_omi_backend/task_manager.py:35) and processors set self.task_manager = get_task_manager() (backends/advanced/src/advanced_omi_backend/processors.py:119), so the access at processors.py:505 is valid.

Likely an incorrect or invalid review comment.

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (1)

1230-1284: Tighten null‑guards & normalize task fields in get_client_processing_detail

  • Verified: TaskInfo dataclass and BackgroundTaskManager.get_tasks_for_client(...) exist; ClientState exposes user_email, current_audio_uuid, conversation_start_time, sample_rate.
  • Apply the proposed safe changes from the diff: default processing_status to {}, guard task_manager before calling get_tasks_for_client (or keep if get_task_manager can be None), ensure client_tasks is a list, use getattr(task,"metadata",{}) when reading metadata, prefer stable task identifiers (task.id / task.task_id / fallback to task.name), convert timestamps with a resilient _to_iso helper (int/float/datetime/None), and use logger.exception + {e!s} in the 500 response.
  • Outstanding: quick scan did not locate a definition of get_task_manager — confirm whether it can ever return None; if it always returns a manager instance the explicit task_manager None-guard can be removed.

Comment on lines +1164 to +1199
async def get_processor_overview():
"""Get comprehensive processor overview with pipeline stats."""
try:
processor_manager = get_processor_manager()
task_manager = get_task_manager()

# Get pipeline statistics
pipeline_stats = processor_manager.get_pipeline_statistics()

# Get system health metrics
task_health = task_manager.get_health_status()
queue_health = processor_manager.get_queue_health_status()

# Get recent activity
recent_activity = processor_manager.get_processing_history(limit=10)

overview = {
"pipeline_stats": pipeline_stats,
"system_health": {
"total_active_clients": len(processor_manager.active_file_sinks),
"total_processing_tasks": len(processor_manager.processing_tasks),
"task_manager_healthy": task_health.get("healthy", False),
"error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1),
"uptime_hours": time.time() / 3600 # Placeholder
},
"queue_health": queue_health,
"recent_activity": recent_activity[:5] # Last 5 activities
}

return overview
except Exception as e:
logger.error(f"Error getting processor overview: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"}
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Harden overview when task_manager is absent; fix uptime and minor inefficiencies

  • get_task_manager() may be None; calling get_health_status unguarded will 500.
  • Compute uptime from a real start_time if available; avoid epoch/3600 placeholder.
  • Ask processor_manager for only 5 activities instead of fetching 10 then slicing.
  • Use logger.exception and explicit conversion flag.
 async def get_processor_overview():
     """Get comprehensive processor overview with pipeline stats."""
     try:
         processor_manager = get_processor_manager()
-        task_manager = get_task_manager()
+        task_manager = get_task_manager()
 
         # Get pipeline statistics
         pipeline_stats = processor_manager.get_pipeline_statistics()
 
         # Get system health metrics
-        task_health = task_manager.get_health_status()
-        queue_health = processor_manager.get_queue_health_status()
+        task_health = task_manager.get_health_status() if task_manager else {}
+        queue_health = processor_manager.get_queue_health_status()
 
         # Get recent activity
-        recent_activity = processor_manager.get_processing_history(limit=10)
+        recent_activity = processor_manager.get_processing_history(limit=5)
 
+        # Derive safe error rate
+        recent_errors = task_health.get("recent_errors", 0) or 0
+        completed_tasks = task_health.get("completed_tasks", 0) or 0
+        error_rate = (recent_errors / completed_tasks) if completed_tasks > 0 else 0.0
+
+        # Uptime: prefer manager's start_time if provided
+        start_time = getattr(processor_manager, "start_time", None)
+        uptime_hours = ((time.time() - start_time) / 3600) if start_time else None
+
         overview = {
             "pipeline_stats": pipeline_stats,
             "system_health": {
                 "total_active_clients": len(processor_manager.active_file_sinks),
                 "total_processing_tasks": len(processor_manager.processing_tasks),
-                "task_manager_healthy": task_health.get("healthy", False),
-                "error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1),
-                "uptime_hours": time.time() / 3600  # Placeholder
+                "task_manager_healthy": task_health.get("healthy", False) if task_manager else False,
+                "error_rate": error_rate,
+                "uptime_hours": uptime_hours
             },
             "queue_health": queue_health,
-            "recent_activity": recent_activity[:5]  # Last 5 activities
+            "recent_activity": recent_activity  # Last 5 activities
         }
 
         return overview
     except Exception as e:
-        logger.error(f"Error getting processor overview: {e}")
+        logger.exception("Error getting processor overview")
         return JSONResponse(
-            status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"}
+            status_code=500, content={"error": f"Failed to get processor overview: {e!s}"}
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def get_processor_overview():
"""Get comprehensive processor overview with pipeline stats."""
try:
processor_manager = get_processor_manager()
task_manager = get_task_manager()
# Get pipeline statistics
pipeline_stats = processor_manager.get_pipeline_statistics()
# Get system health metrics
task_health = task_manager.get_health_status()
queue_health = processor_manager.get_queue_health_status()
# Get recent activity
recent_activity = processor_manager.get_processing_history(limit=10)
overview = {
"pipeline_stats": pipeline_stats,
"system_health": {
"total_active_clients": len(processor_manager.active_file_sinks),
"total_processing_tasks": len(processor_manager.processing_tasks),
"task_manager_healthy": task_health.get("healthy", False),
"error_rate": task_health.get("recent_errors", 0) / max(task_health.get("completed_tasks", 1), 1),
"uptime_hours": time.time() / 3600 # Placeholder
},
"queue_health": queue_health,
"recent_activity": recent_activity[:5] # Last 5 activities
}
return overview
except Exception as e:
logger.error(f"Error getting processor overview: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to get processor overview: {str(e)}"}
)
async def get_processor_overview():
"""Get comprehensive processor overview with pipeline stats."""
try:
processor_manager = get_processor_manager()
task_manager = get_task_manager()
# Get pipeline statistics
pipeline_stats = processor_manager.get_pipeline_statistics()
# Get system health metrics
task_health = task_manager.get_health_status() if task_manager else {}
queue_health = processor_manager.get_queue_health_status()
# Get recent activity
recent_activity = processor_manager.get_processing_history(limit=5)
# Derive safe error rate
recent_errors = task_health.get("recent_errors", 0) or 0
completed_tasks = task_health.get("completed_tasks", 0) or 0
error_rate = (recent_errors / completed_tasks) if completed_tasks > 0 else 0.0
# Uptime: prefer manager's start_time if provided
start_time = getattr(processor_manager, "start_time", None)
uptime_hours = ((time.time() - start_time) / 3600) if start_time else None
overview = {
"pipeline_stats": pipeline_stats,
"system_health": {
"total_active_clients": len(processor_manager.active_file_sinks),
"total_processing_tasks": len(processor_manager.processing_tasks),
"task_manager_healthy": task_health.get("healthy", False) if task_manager else False,
"error_rate": error_rate,
"uptime_hours": uptime_hours
},
"queue_health": queue_health,
"recent_activity": recent_activity
}
return overview
except Exception as e:
logger.exception("Error getting processor overview")
return JSONResponse(
status_code=500, content={"error": f"Failed to get processor overview: {e!s}"}
)
🧰 Tools
🪛 Ruff (0.13.0)

1193-1193: Consider moving this statement to an else block

(TRY300)


1194-1194: Do not catch blind exception: Exception

(BLE001)


1195-1195: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1197-1197: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
around lines 1164-1199, harden get_processor_overview by guarding against a
missing task_manager (only call task_manager.get_health_status() if task_manager
is not None, otherwise use safe defaults), compute uptime_hours from a real
start_time when available (e.g., use (time.time() -
processor_manager.start_time)/3600 if processor_manager.start_time exists,
otherwise fallback to (time.time()/3600)), request only 5 recent activities from
processor_manager.get_processing_history(limit=5) instead of fetching 10 then
slicing, compute error_rate with a safe denominator to avoid division by zero,
and replace logger.error with logger.exception while returning the JSONResponse
with an explicit string conversion for the error (e.g., content={"error":
str(e)}).

Comment on lines +478 to +479
"avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect calculation of average processing time.

The mean calculation uses raw duration values but the metadata stores processing_time (not duration). Also, the success rate calculation doesn't validate if tasks actually succeeded.

-            "avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
-            "success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
+            "avg_processing_time_ms": mean([t.get("duration", 0) * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
+            "success_rate": len([t for t in audio_tasks[-100:] if t and not t.get("error")]) / max(len(audio_tasks[-100:]), 1),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
"avg_processing_time_ms": mean([t.get("processing_time", 0) * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t and not t.get("error")]) / max(len(audio_tasks[-100:]), 1),
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py around lines
478-479, the avg_processing_time_ms calculation is using a nonexistent
"duration" field and should use the stored "processing_time" (converted to
milliseconds), and the success_rate should explicitly count tasks that indicate
success (e.g., t.get("success") is truthy or t.get("status") == "success")
rather than treating any task object as a success; update the mean calculation
to use t["processing_time"] * 1000 (guarding for missing values) and compute
success_rate as count_of_successful_tasks / max(total_tasks, 1) over the same
slice.

Comment on lines +489 to +492
"queue_size": queue.qsize() if queue else 0,
"active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
"avg_processing_time_ms": 30000, # Placeholder - can be calculated from task manager history
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix active tasks calculation logic error.

Line 490-491: The logic for checking active tasks is incorrect - it's trying to iterate over processing_tasks items but treating them as if the key contains the stage name, which doesn't match the data structure where stages are nested within client tasks.

-                "active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
-                                   if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
+                "active_tasks": sum(1 for client_tasks in self.processing_tasks.values()
+                                   if stage in client_tasks and 
+                                   not self.task_manager.get_task_info(client_tasks.get(stage, "")).completed_at 
+                                   if self.task_manager.get_task_info(client_tasks.get(stage, "")) else False),

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py around lines 489 to
492, the active tasks count is wrong because it assumes processing_tasks keys
contain the stage name; instead iterate the processing_tasks values (each
client/task info), extract the stage-specific task id from that nested structure
(e.g. tinfo.get(stage) or similar), skip if missing, then look up that stage
task via self.task_manager.get_task_info(...) and count it as active only when
that lookup exists and its completed_at is falsy; update the comprehension/loop
accordingly and guard against missing task_info to avoid attribute errors.

@AnkushMalaker AnkushMalaker deleted the processor-view branch December 18, 2025 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant