Skip to content

Reprocess fix#116

Closed
AnkushMalaker wants to merge 8 commits intomainfrom
reprocess-fix
Closed

Reprocess fix#116
AnkushMalaker wants to merge 8 commits intomainfrom
reprocess-fix

Conversation

@AnkushMalaker
Copy link
Collaborator

@AnkushMalaker AnkushMalaker commented Sep 22, 2025

Summary by CodeRabbit

  • New Features
    • Added an admin-only Processes dashboard with pipeline health, active tasks, processing history, and per-client detail views.
    • New API endpoints for processor overview, history (paginated), and client processing detail (admin-only).
  • Improvements
    • Revamped Upload flow with clearer phases and unified task-based progress polling.
    • Reprocessing now queues work reliably with clearer status reporting.
    • Enhanced processing metrics, queue health, and automatic cleanup for more stable operation.
  • Documentation
    • Simplified core docs and added detailed guides: API Reference, Distributed Deployment, Memory Providers, Speaker Recognition, Versioned Processing, and Wyoming Protocol.

- Updated the `CLAUDE.md` documentation to reflect changes in ASR service command.
- Introduced a new method `load_audio_file_as_chunk` in `audio_utils.py` for loading audio files into the Wyoming AudioChunk format.
- Enhanced `ProcessorManager` to include client type detection and improved cleanup of processing tasks.
- Updated `conversation_controller.py` to queue transcription and memory processing jobs with better error handling.
- Refactored the `Upload` component in the web UI to support a three-phase upload process with improved status management and polling for processing tasks.
- Added new API methods for asynchronous file uploads and job status retrieval.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

Walkthrough

Shifts documentation to external references; removes transcript coordinator; centralizes conversation creation via ConversationManager; adds queue-based reprocessing; enhances ProcessorManager with cleanup, metrics, and health; adds processor overview/history/detail endpoints; revamps upload flow and adds a Processes dashboard in the web UI; introduces audio file loader and a diarization utility script.

Changes

Cohort / File(s) Summary of changes
Documentation Restructure
CLAUDE.md, docs/api-reference.md, docs/distributed-deployment.md, docs/memory-providers.md, docs/speaker-recognition.md, docs/versioned-processing.md, docs/wyoming-protocol.md
Major rewrite of CLAUDE.md to link to new dedicated docs; added multiple focused docs covering API reference, distributed deployment, memory providers, speaker recognition, versioned processing, and Wyoming protocol.
Audio Utilities
backends/advanced/src/advanced_omi_backend/audio_utils.py
Added load_audio_file_as_chunk(Path) -> AudioChunk with validation; removed client state update side-effect in process_audio_chunk.
Client State Simplification
backends/advanced/src/advanced_omi_backend/client.py, backends/advanced/src/advanced_omi_backend/main.py
Removed auto new-conversation logic and timeout config; adjusted cleanup to target processor tasks; dropped transcript coordinator import and related health field.
Reprocessing via Queues
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
Replaced placeholder reprocessing with queue-based flow using env-driven providers; returns QUEUED; added error handling and logging.
Processor Insights & Cleanup
backends/advanced/src/advanced_omi_backend/processors.py
Added status enrichment (user_id, client_type), stale/completed cleanup routines, queue health, pipeline statistics; integrated cleanup into audio loop.
System Controller & Routes
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py, backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py
Made per-file persistent client handling for uploads; added processor overview/history/client-detail endpoints; aligned cleanup and logging; added admin-only routes for new endpoints.
Conversation Management
backends/advanced/src/advanced_omi_backend/conversation_manager.py, backends/advanced/src/advanced_omi_backend/transcription.py
Centralized conversation creation/processing with title/summary generation; transcription now delegates to conversation manager; removed prior title/summary helpers and transcript coordination signals.
Transcript Coordinator Removal
backends/advanced/src/advanced_omi_backend/transcript_coordinator.py
Deleted the module, its class, exception, singleton, and all signaling APIs.
Web UI: Processes Dashboard
backends/advanced/webui/src/App.tsx, .../components/layout/Layout.tsx, .../pages/Processes.tsx, .../components/processes/*
Added admin-only Processes page with system health cards, pipeline view, active tasks table, history list, and client detail modal; integrated route and nav item.
Web UI: System Page Update
backends/advanced/webui/src/pages/System.tsx
Removed processor/active-client panels; simplified to services, provider details, diarization settings layout, and debug metrics.
Web UI: Upload Flow Revamp
backends/advanced/webui/src/pages/Upload.tsx
Implemented phase-based upload and unified polling via processor tasks; session persistence; enhanced per-file status UI.
API Client Updates
backends/advanced/webui/src/services/api.ts
Added processor overview/history/client-detail endpoints; async file upload with progress; job status helpers; noted removal of upload session API.
Extras: Diarization Utility
extras/speaker-recognition/sortformer.py
New script to test SortFormer diarization with enrollment, embedding extraction, mapping, and labeled segment output.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant WebUI as Web UI (Processes)
  participant SysAPI as System Routes
  participant SysCtl as System Controller
  participant ProcMgr as ProcessorManager

  User->>WebUI: Open /processes (admin)
  WebUI->>SysAPI: GET /processor/overview
  SysAPI->>SysCtl: get_processor_overview()
  SysCtl->>ProcMgr: get_pipeline_statistics(), get_queue_health_status(), get_processing_status()
  ProcMgr-->>SysCtl: Stats, health, statuses
  SysCtl-->>SysAPI: Overview payload
  SysAPI-->>WebUI: Overview JSON
  WebUI->>SysAPI: (optional) GET /processor/history?page=&per_page=
  SysAPI->>SysCtl: get_processor_history()
  SysCtl-->>SysAPI: History page
  SysAPI-->>WebUI: History JSON
  WebUI->>SysAPI: (optional) GET /processor/clients/{client_id}
  SysAPI->>SysCtl: get_client_processing_detail(client_id)
  SysCtl-->>SysAPI: Client detail
  SysAPI-->>WebUI: Detail JSON
Loading
sequenceDiagram
  autonumber
  actor Admin
  participant API as Conversation Controller
  participant AU as Audio Utils
  participant Proc as ProcessorManager
  participant Queues as Queues

  Admin->>API: POST /conversations/{id}/reprocess/transcript
  API->>AU: load_audio_file_as_chunk(path)
  AU-->>API: AudioChunk
  API->>Proc: queue TranscriptionItem
  Proc->>Queues: enqueue transcription
  Proc-->>API: ack
  API-->>Admin: { status: "QUEUED" }

  Admin->>API: POST /conversations/{id}/reprocess/memory
  API->>Proc: queue MemoryProcessingItem
  Proc->>Queues: enqueue memory
  Proc-->>API: ack
  API-->>Admin: { status: "QUEUED" }
Loading
sequenceDiagram
  autonumber
  participant Trans as Transcription
  participant CM as ConversationManager
  participant Repo as Repositories

  Trans->>CM: create_conversation_with_processing(audio_uuid, transcript, analysis, segments)
  CM->>Repo: create conversation + transcript version
  CM->>CM: generate title & summary (LLM)
  CM->>Repo: update conversation with title/summary
  CM-->>Trans: conversation_id
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • Reprocessing WIP #96 — Implements versioned processing and reprocessing flows touching controllers/routers/db, aligning with this PR’s versioned workflows.
  • Better audio processing #111 — Refactors audio-processing pipeline and client/processor management, overlapping with ProcessorManager and upload handling changes here.
  • Openmemory mcp #69 — Introduces OpenMemory MCP provider and memory configuration, directly related to memory provider/env-driven changes.

Poem

In queues we trust, I thump the floor,
New pipes hum soft—then metrics roar.
Titles bloom from whispered threads,
Cleanup sweeps the stale to sheds.
Dashboards glow, a burrow bright—
I nibble logs by dashboard light.
Hop, queue, process—good night! 🐇✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Reprocess fix" directly reflects the primary intent of the changeset, which centers on fixing and refactoring reprocessing flows (queue-based transcript and memory reprocessing, audio-file reprocessing support, and related controller/processor updates), and is concise and relevant to the PR objectives.
Docstring Coverage ✅ Passed Docstring coverage is 82.86% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch reprocess-fix

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 24, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2)

236-241: Bug: raising JSONResponse will crash; return it instead.

JSONResponse isn’t an Exception. raise JSONResponse(...) will throw a TypeError.

Apply this diff and revise user‑facing text:

-                        audio_logger.warning(
-                            f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz."
-                        )
-                        raise JSONResponse(status_code=400, content={"error": f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry"})
+                        audio_logger.warning(
+                            "Unsupported sample rate: %s (expected 16000 Hz)", sample_rate
+                        )
+                        return JSONResponse(
+                            status_code=400,
+                            content={"error": f"Unsupported sample rate {sample_rate} Hz; please upload 16 kHz WAV."},
+                        )

93-111: Remove triple‑quoted “commented code” block.

A stray string literal has no effect and is confusing. Delete or move back to real comments.

-        """This is a hack to update it the DB INCASE a process failed
-        if transcription_stage.get("status") == "started" and not transcription_stage.get("completed", False):
-            # Check if transcription is actually complete by checking the database
-            try:
-                chunk = await chunks_col.find_one({"client_id": client_id})
-                if chunk and chunk.get("transcript") and len(chunk.get("transcript", [])) > 0:
-                    # Transcription is complete! Update the processor state
-                    processor_manager.track_processing_stage(
-                        client_id,
-                        "transcription",
-                        "completed",
-                        {"audio_uuid": chunk.get("audio_uuid"), "segments": len(chunk.get("transcript", []))}
-                    )
-                    logger.info(f"Detected transcription completion for client {client_id} ({len(chunk.get('transcript', []))} segments)")
-                    # Get updated status
-                    processing_status = processor_manager.get_processing_status(client_id)
-            except Exception as e:
-                logger.debug(f"Error checking transcription completion: {e}")
-        """
+        # (Removed legacy DB backfill hack for transcription completion)
backends/advanced/src/advanced_omi_backend/memory/memory_service.py (1)

450-459: Propagate whether LLM/actions processing actually ran

Return a boolean indicating the update flow executed successfully (even if actions were NONE).

Apply:

-    ) -> List[str]:
+    ) -> Tuple[List[str], bool]:
@@
-        created_ids: List[str] = []
+        created_ids: List[str] = []
+        processed_ok = False
@@
-            actions_obj = await self.llm_provider.propose_memory_actions(
+            actions_obj = await self.llm_provider.propose_memory_actions(
                 retrieved_old_memory=retrieved_old_memory,
                 new_facts=memories_text,
                 custom_prompt=None,
             )
             memory_logger.info(f"📝 UpdateMemory LLM returned: {type(actions_obj)} - {actions_obj}")
+            processed_ok = True
         except Exception as e_actions:
             memory_logger.error(f"LLM propose_memory_actions failed: {e_actions}")
             actions_obj = {}
@@
-        return created_ids
+        return created_ids, processed_ok

And update the caller as shown in the previous comment.

Also applies to: 507-533

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2)

285-287: Cropped file path mismatch leads to undeleted files

Cropping writes to /app/data/audio_chunks but deletion checks only /app/audio_chunks. Leaks files.

Unify paths or check both locations on delete:

-                # Construct full path to audio file
-                full_audio_path = Path("/app/audio_chunks") / audio_path
-                if full_audio_path.exists():
-                    full_audio_path.unlink()
+                # Construct candidate paths to audio file
+                candidate_paths = [
+                    Path("/app/audio_chunks") / audio_path,
+                    Path("/app/data/audio_chunks") / audio_path,
+                ]
+                for p in candidate_paths:
+                    if p.exists():
+                        p.unlink()
+                        deleted_files.append(str(p))
+                        logger.info(f"Deleted audio file: {p}")
+                        break
@@
-                # Construct full path to cropped audio file
-                full_cropped_path = Path("/app/audio_chunks") / cropped_audio_path
-                if full_cropped_path.exists():
-                    full_cropped_path.unlink()
-                    deleted_files.append(str(full_cropped_path))
-                    logger.info(f"Deleted cropped audio file: {full_cropped_path}")
+                # Construct candidate paths to cropped audio file
+                candidate_paths = [
+                    Path("/app/audio_chunks") / cropped_audio_path,
+                    Path("/app/data/audio_chunks") / cropped_audio_path,
+                ]
+                for p in candidate_paths:
+                    if p.exists():
+                        p.unlink()
+                        deleted_files.append(str(p))
+                        logger.info(f"Deleted cropped audio file: {p}")
+                        break

Alternatively, centralize the base dir in config.

Also applies to: 492-507


78-78: Wrong user attribute in log

Use user.user_id (consistent across file), not user.id.

-        logger.info(f"Manually closed conversation for client {client_id} by user {user.id}")
+        logger.info(f"Manually closed conversation for client {client_id} by user {user.user_id}")
🧹 Nitpick comments (55)
backends/advanced/src/advanced_omi_backend/audio_utils.py (2)

147-151: Prefer exception chaining and stack-aware logging.

Use logger.exception() and preserve the cause with raise ... from e.

Apply this diff:

-    except FileNotFoundError:
-        logger.error(f"Audio file not found: {audio_path}")
-        raise
-    except Exception as e:
-        logger.error(f"Error loading audio file {audio_path}: {e}")
-        raise ValueError(f"Invalid audio file format: {e}")
+    except FileNotFoundError:
+        logger.exception(f"Audio file not found: {audio_path}")
+        raise
+    except Exception as e:
+        logger.exception(f"Error loading audio file {audio_path}")
+        raise ValueError(f"Invalid audio file format: {e}") from e

91-104: Make this sync (no awaits) or use to_thread for file I/O.

The function is async but performs only blocking I/O; either make it def or wrap file/WAV work in await asyncio.to_thread(...) when called.

docs/wyoming-protocol.md (2)

7-10: Add language hint to fenced code block.

Fixes MD040 lint and improves rendering.

Apply this diff:

-```
+```text
 {JSON_HEADER}\n
 <binary_payload>

---

`12-13`: **Clarify timestamp units to avoid mismatches.**

Explicitly state timestamps are milliseconds since Unix epoch.


Apply this diff:

```diff
 ## Supported Events
+
+Note: All timestamps are expressed in milliseconds since Unix epoch.
backends/advanced/src/advanced_omi_backend/main.py (1)

273-280: Use logger.exception for stack and avoid blind catch if possible.

Keeps tracebacks and simplifies triage.

Apply this diff:

-        except Exception as processor_cleanup_error:
-            logger.error(f"Error cleaning up processor tasks for {client_id}: {processor_cleanup_error}")
+        except Exception as processor_cleanup_error:
+            logger.exception(f"Error cleaning up processor tasks for {client_id}")
backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (2)

19-20: Format large numbers for readability.

Use locale-aware formatting for counts.

Apply this diff:

-      value: data.total_active_clients,
+      value: data.total_active_clients.toLocaleString(),
...
-      value: data.total_processing_tasks,
+      value: data.total_processing_tasks.toLocaleString(),

Also applies to: 26-27


106-114: Add accessible label for health status dot.

Improve a11y with an aria label.

Apply this diff:

-              <div className={`w-2 h-2 rounded-full mr-2 ${
+              <div
+                role="img"
+                aria-label={`Task Manager is ${data.task_manager_healthy ? 'healthy' : 'unhealthy'}`}
+                className={`w-2 h-2 rounded-full mr-2 ${
                   data.task_manager_healthy ? 'bg-green-500' : 'bg-red-500'
-                }`} />
+                }`}
+              />
extras/speaker-recognition/sortformer.py (8)

1-1: Remove shebang or make the file executable.

Either chmod +x this script or drop the shebang to satisfy EXE001.

Apply this diff to remove it:

-#!/usr/bin/env python3

22-31: Avoid blind except and unused variable; log the failure.

Catching Exception without logging hides errors. Also e was unused.

Apply this diff:

-    except Exception as e:
-        return 0.0
+    except Exception as err:
+        print(f"ERROR reading WAV duration for {file_path}: {err}")
+        return 0.0

109-110: Prefix unused var to silence lint.

sr isn’t used; prefer _.

-    full_wav, sr = load_audio_16k_mono(audio_file)
+    full_wav, _ = load_audio_16k_mono(audio_file)

151-154: Replace bare excepts with explicit Exception and log.

Bare except swallows real errors and complicates debugging.

-            except:
-                pass
+            except Exception as err:
+                print(f"WARN: failed to remove temp file {temp_path}: {err}")
-    try:
-        os.rmdir(temp_dir)
-    except:
-        pass
+    try:
+        os.rmdir(temp_dir)
+    except OSError as err:
+        print(f"NOTE: temp dir cleanup skipped: {err}")

Also applies to: 166-169


173-176: Use a non‑zero default similarity threshold.

A 0.3–0.4 cosine threshold avoids over‑eager mappings in noisy cases.

-def map_speakers_to_enrollment(track_embeddings: Dict[int, torch.Tensor], 
-                              enrollment: Dict[str, torch.Tensor],
-                              similarity_threshold: float = 0.0) -> Dict[int, str]:
+def map_speakers_to_enrollment(track_embeddings: Dict[int, torch.Tensor],
+                              enrollment: Dict[str, torch.Tensor],
+                              similarity_threshold: float = 0.3) -> Dict[int, str]:

117-121: Prefer TemporaryDirectory for segment artifacts.

Avoids manual cleanup and racey rmdir by using tempfile.TemporaryDirectory().

Example sketch:

import tempfile

with tempfile.TemporaryDirectory(prefix="segments_") as temp_dir:
    # write per‑segment wavs inside temp_dir
    # no manual rm needed

Also applies to: 140-155, 165-169


253-262: Rename unused loop variable to underscore.

Silences B007 while keeping the dependency check loop behavior.

-    for category, files in test_files.items():
+    for _category, files in test_files.items():

335-335: Remove extraneous f‑prefix.

No placeholders present.

-    print(f"\nSpeaker summary:")
+    print("\nSpeaker summary:")
CLAUDE.md (2)

282-304: Add explicit note on reprocessing identifiers.

Docs should state that reprocessing and version management use conversation_id (not audio_uuid) to match backend behavior.


306-314: Caution on destructive reset commands.

Consider flagging these as “development only” and requiring explicit confirmation to avoid accidental data loss.

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (7)

257-271: Pass the created client_state or drop its creation.

You create client_state but pass client_state=None to process_audio_chunk. Either pass it through or stop creating it to reduce overhead.

-                            client_state=None,  # No client state needed for file upload
+                            client_state=client_state,

If process_audio_chunk doesn’t use it, remove the create_client_state call for upload path.


390-402: Avoid duplicate conversation closes.

You close via close_client_audio, may trigger processing; then call close_current_conversation() twice (once after wait, once again after loop). This can double‑trigger downstream work.

  • Keep a single, well‑timed close_current_conversation() (e.g., only when transcription completes and memory is pending).

Also applies to: 738-741


545-546: Remove unused symbols.

processed_files here and uuid import aren’t used.

-        # Process files one by one
-        processed_files = []
+        # Process files one by one
-    import uuid

Also applies to: 533-533


404-409: Use logger.exception in except blocks.

Includes stack traces and avoids stringifying exceptions.

Examples:

-            except Exception as e:
-                audio_logger.error(f"Error processing file {file.filename}: {e}")
+            except Exception:
+                audio_logger.exception("Error processing file %s", file.filename)
-    except Exception as e:
-        audio_logger.error(f"Error in process_audio_files: {e}")
+    except Exception:
+        audio_logger.exception("Error in process_audio_files")
-    except Exception as e:
-        error_msg = f"Job processing failed: {str(e)}"
-        audio_logger.error(f"💥 [Job {job_id}] {error_msg}")
+    except Exception:
+        audio_logger.exception("💥 [Job %s] Job processing failed", job_id)
+        error_msg = "Job processing failed"
-    except Exception as e:
-        logger.error(f"Error deleting all memories for user {user.user_id}: {e}")
+    except Exception:
+        logger.exception("Error deleting all memories for user %s", user.user_id)
-    except Exception as e:
-        logger.error(f"Error getting client processing detail for {client_id}: {e}")
+    except Exception:
+        logger.exception("Error getting client processing detail for %s", client_id)

Also applies to: 428-431, 767-771, 1174-1178, 1297-1301


1187-1205: Don’t report epoch as uptime.

uptime_hours = time.time()/3600 is epoch hours, not process uptime. Track a PROCESS_START_TIME at module import and compute (time.time() - PROCESS_START_TIME)/3600, or omit until available.


606-651: Align sample-rate handling between sync and async paths.

The sync path rejects non‑16k WAVs; the async path accepts any rate. Consider resampling or consistent validation here, too.


163-165: Unused parameter: auto_generate_client.

Not referenced in the implementation. Remove or implement its behavior to avoid confusion.

backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (1)

171-175: Silence ARG001 by using underscore for injected deps.

FastAPI dependency injection requires the parameter; rename to _ to satisfy the linter without changing behavior.

-async def get_processor_overview_route(current_user: User = Depends(current_superuser)):
+async def get_processor_overview_route(_: User = Depends(current_superuser)):
-async def get_processor_history_route(
+async def get_processor_history_route(
     page: int = Query(1, ge=1, description="Page number"),
     per_page: int = Query(50, ge=1, le=100, description="Items per page"),
-    current_user: User = Depends(current_superuser)
+    _: User = Depends(current_superuser)
 ):
-async def get_client_processing_detail_route(
-    client_id: str,
-    current_user: User = Depends(current_superuser)
-):
+async def get_client_processing_detail_route(
+    client_id: str,
+    _: User = Depends(current_superuser)
+):

Note: B008 for Depends in defaults is expected in FastAPI; consider ignoring it in Ruff config for router modules.

Also applies to: 177-184, 186-192

backends/advanced/src/advanced_omi_backend/conversation_manager.py (2)

30-38: Prefix unused parameter to avoid ARG002.

transcript_data isn’t used in create_conversation.

-    async def create_conversation(self, audio_uuid: str, transcript_data: dict, speech_analysis: dict, chunk_repo):
+    async def create_conversation(self, audio_uuid: str, _transcript_data: dict, speech_analysis: dict, chunk_repo):

174-182: Prompt formatting: add newlines for clarity.

Concatenation currently jams the input and rules together.

-            prompt = f"Generate a concise, descriptive title (max 40 characters) for {context}:"\
-                + f"{conversation_text}"\
-                + "Rules:\n"\
+            prompt = (
+                f"Generate a concise, descriptive title (max 40 characters) for {context}:\n\n"
+                f"{conversation_text}\n\n"
+                "Rules:\n"
                 + "- Maximum 40 characters\n"
                 + f"{include_speakers_instruction}\n"
                 + "- Capture the main topic\n"
                 + "- Be specific and informative\n"
-                + "Title:"
+                + "Title:"
+            )
backends/advanced/src/advanced_omi_backend/transcription.py (2)

213-217: Use logger.exception; avoid str(e).

Include traceback for failed transcription.

-                # Transcription failed
-                logger.error(f"Transcript failed for {self._current_audio_uuid}: {str(e)}")
+                # Transcription failed
+                logger.exception("Transcript failed for %s", self._current_audio_uuid)

473-476: Tone down the edge‑case log.

Replace the humorous message with a neutral, actionable message to keep logs professional.

-                logger.warning(f"🚨 EDGE CASE: Speech detected but no segments processed for {self._current_audio_uuid}. Developer felt this edge case can never happen. Developer wants to sleep. 😴")
+                logger.warning("EDGE CASE: Speech detected but no segments processed for %s. Investigate diarization or text normalization paths.", self._current_audio_uuid)
backends/advanced/webui/src/components/layout/Layout.tsx (1)

76-80: Improve active link detection and accessibility

Current equality check won’t highlight nested routes; also add aria-current for a11y.

Apply:

-                      className={`flex items-center space-x-3 px-3 py-2 rounded-md text-sm font-medium transition-colors ${
-                        location.pathname === path
+                      className={`flex items-center space-x-3 px-3 py-2 rounded-md text-sm font-medium transition-colors ${
+                        location.pathname === path || location.pathname.startsWith(path + '/')
                           ? 'bg-blue-100 text-blue-900 dark:bg-blue-900 dark:text-blue-100'
                           : 'text-gray-700 dark:text-gray-300 hover:bg-gray-100 dark:hover:bg-gray-700'
                       }`}
+                      aria-current={location.pathname === path || location.pathname.startsWith(path + '/') ? 'page' : undefined}

Also applies to: 75-85

backends/advanced/src/advanced_omi_backend/memory/memory_service.py (1)

563-563: Avoid logging full action payloads at info level

actions_list may include user content; prefer debug.

-            memory_logger.info(f"📋 Normalized to {len(actions_list)} actions: {actions_list}")
+            memory_logger.debug(f"📋 Normalized to {len(actions_list)} actions: {actions_list}")
backends/advanced/webui/src/App.tsx (1)

13-13: Optional: Gate route by admin at routing level

The page self-gates, but you can also add an admin-only guard to prevent rendering for non-admin deep links.

If ProtectedRoute supports roles, wrap with an admin check; otherwise, keep as-is.

docs/api-reference.md (1)

56-63: Avoid instructing users to read secrets from .env in docs

Suggest neutral phrasing to obtain local credentials without endorsing copying real secrets; remove “Read tool” references (internal).

-# Use the Read tool to view the .env file and identify credentials
-# Look for:
+# In local development, ensure ADMIN_EMAIL and ADMIN_PASSWORD are set (e.g., in a .env file not committed to VCS).
+# Example:
 # ADMIN_EMAIL=admin@example.com
 # ADMIN_PASSWORD=your-password-here
-**Important**: Always read the .env file first using the Read tool rather than using shell commands like `grep` or `cut`. This ensures you see the exact values and can copy them accurately.
+Note: Handle your .env file carefully. Do not commit real credentials. Use environment variables or a local .env file for development.

Also applies to: 89-89

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2)

639-643: Prefer logger.exception and avoid broad catches

Use logger.exception to include stack traces. Consider narrowing the exception where possible.

-        except Exception as e:
-            logger.error(f"Error queuing transcript reprocessing: {e}")
+        except Exception:
+            logger.exception("Error queuing transcript reprocessing")
             return JSONResponse(
-                status_code=500, content={"error": f"Failed to queue reprocessing: {str(e)}"}
+                status_code=500, content={"error": "Failed to queue reprocessing"}
             )
-        except Exception as e:
-            logger.error(f"Error queuing memory reprocessing: {e}")
+        except Exception:
+            logger.exception("Error queuing memory reprocessing")
             return JSONResponse(
-                status_code=500, content={"error": f"Failed to queue memory reprocessing: {str(e)}"}
+                status_code=500, content={"error": "Failed to queue memory reprocessing"}
             )

Also applies to: 752-756


620-636: Version creation occurs before queuing; no rollback on queue failure

If queueing fails, a new version and processing run remain created. Consider marking run/version as failed or rolling back.

  • Option A: Create version with status QUEUED and update to FAILED on queue error.
  • Option B: On failure, delete the created version and run records.
    I can help wire a small transactional helper around these two operations.

Also applies to: 707-716

backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (2)

69-77: Normalize status mapping to handle uppercase values

Backends often use uppercase (QUEUED/RUNNING/COMPLETED/FAILED). Normalize to lower-case before switch.

-  const getStatusIcon = (status: string) => {
-    switch (status) {
+  const getStatusIcon = (status: string) => {
+    const s = status?.toLowerCase?.() || status
+    switch (s) {
@@
-  const getStatusColor = (status: string) => {
-    switch (status) {
+  const getStatusColor = (status: string) => {
+    const s = status?.toLowerCase?.() || status
+    switch (s) {

Also applies to: 80-88


39-41: Surface API error details when available

Include server-provided message if present.

-    } catch (err: any) {
-      setError(err.message || 'Failed to load processing history')
+    } catch (err: any) {
+      const apiMsg = err?.response?.data?.error || err?.response?.data?.message
+      setError(apiMsg || err.message || 'Failed to load processing history')
backends/advanced/webui/src/pages/System.tsx (1)

61-65: Minor: Promise.allSettled usage can be simplified

You already catch getMetrics; using allSettled plus per-call catch is redundant. Optional cleanup only.

backends/advanced/webui/src/pages/Processes.tsx (1)

76-85: Avoid overlapping refresh calls

If a request takes >5s, interval can stack calls. Guard on loading or serialize.

-    const interval = setInterval(() => {
-      loadProcessorOverview()
+    const interval = setInterval(() => {
+      if (!loading) loadProcessorOverview()
     }, 5000)

Also consider an AbortController to cancel in-flight requests on unmount.

Also applies to: 134-142

docs/speaker-recognition.md (1)

15-22: Add brief auth, rate‑limit, and key‑handling notes.

  • Document required roles/permissions for these endpoints.
  • Note recommended rate limits for live inference.
  • Clarify Deepgram key sourcing (env var, not stored client-side).

Also applies to: 68-73

backends/advanced/webui/src/components/processes/ClientDetailModal.tsx (2)

44-55: Avoid setState after unmount (race on fast close/refresh).

Guard state updates when the modal unmounts to prevent React warnings.

Apply this diff:

-import { useState, useEffect } from 'react'
+import { useState, useEffect, useRef } from 'react'
@@
 export default function ClientDetailModal({ clientId, onClose }: ClientDetailModalProps) {
   const [clientDetail, setClientDetail] = useState<ClientProcessingDetail | null>(null)
   const [loading, setLoading] = useState(false)
   const [error, setError] = useState<string | null>(null)
+  const isMountedRef = useRef(true)
@@
   const loadClientDetail = async () => {
     try {
-      setLoading(true)
-      setError(null)
+      if (!isMountedRef.current) return
+      setLoading(true)
+      setError(null)
       const response = await systemApi.getClientProcessingDetail(clientId)
-      setClientDetail(response.data)
+      if (!isMountedRef.current) return
+      setClientDetail(response.data)
     } catch (err: any) {
-      setError(err.message || 'Failed to load client details')
+      if (!isMountedRef.current) return
+      const message = err?.response?.data?.detail || err?.message || 'Failed to load client details'
+      setError(message)
     } finally {
-      setLoading(false)
+      if (!isMountedRef.current) return
+      setLoading(false)
     }
   }
@@
   useEffect(() => {
     loadClientDetail()
     }, [clientId])
+
+  useEffect(() => {
+    return () => { isMountedRef.current = false }
+  }, [])

5-13: Expose conversation_id alongside audio UUID to align with reprocessing flows.

Per our team learning, reprocessing/versions center on conversation_id. Surface it in client_info (if available) for operators.

Apply this diff:

 interface ClientProcessingDetail {
   client_id: string
   client_info: {
     user_id: string
     user_email: string
     current_audio_uuid?: string
+    conversation_id?: string
     conversation_start_time?: string
     sample_rate?: number
   }
@@
                 <div>
                   <label className="text-sm font-medium text-gray-600 dark:text-gray-400">Current Audio UUID</label>
                   <p className="text-gray-900 dark:text-gray-100">
                     {clientDetail.client_info.current_audio_uuid ? (
                       <code className="text-xs bg-gray-200 dark:bg-gray-600 px-2 py-1 rounded">
                         {clientDetail.client_info.current_audio_uuid}
                       </code>
                     ) : (
                       'None'
                     )}
                   </p>
                 </div>
+                <div>
+                  <label className="text-sm font-medium text-gray-600 dark:text-gray-400">Conversation ID</label>
+                  <p className="text-gray-900 dark:text-gray-100">
+                    {clientDetail.client_info.conversation_id ? (
+                      <code className="text-xs bg-gray-200 dark:bg-gray-600 px-2 py-1 rounded">
+                        {clientDetail.client_info.conversation_id}
+                      </code>
+                    ) : (
+                      'N/A'
+                    )}
+                  </p>
+                </div>

Please confirm the backend includes conversation_id in this payload; if not, we can add it to the API and wire through.

Also applies to: 146-176

backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx (2)

191-199: Compute average success rate with weighting to reflect load.

A simple mean can mislead when stages have very different throughput. Weight by throughput_per_minute (or active_tasks) for a truer picture.

Apply this diff:

-            <div className="text-2xl font-bold text-gray-900 dark:text-gray-100">
-              {Math.round(Object.values(pipelineStats).reduce((sum, stage) => sum + stage.success_rate, 0) / Object.keys(pipelineStats).length * 100)}%
-            </div>
+            <div className="text-2xl font-bold text-gray-900 dark:text-gray-100">
+              {(() => {
+                const entries = Object.values(pipelineStats)
+                const totalW = entries.reduce((w, s) => w + s.throughput_per_minute, 0) || 1
+                const weighted = entries.reduce((sum, s) => sum + s.success_rate * s.throughput_per_minute, 0)
+                return Math.round((weighted / totalW) * 100)
+              })()}%
+            </div>

18-19: Tighten queueHealth typing to prevent key typos.

Bind queueHealth keys to the stage keys so invalid strings don’t sneak in.

Apply this diff:

-interface ProcessPipelineViewProps {
+type StageKey = 'audio' | 'transcription' | 'memory' | 'cropping'
+interface ProcessPipelineViewProps {
   pipelineStats: {
-    audio: PipelineStageStats
-    transcription: PipelineStageStats
-    memory: PipelineStageStats
-    cropping: PipelineStageStats
+    audio: PipelineStageStats
+    transcription: PipelineStageStats
+    memory: PipelineStageStats
+    cropping: PipelineStageStats
   }
-  queueHealth: Record<string, string>
+  queueHealth: Record<StageKey, string>
 }
@@
-  const stages = [
+  const stages: Array<{ name: string; icon: any; key: StageKey; color: string; description: string }> = [
@@
-          const health = queueHealth[stage.key] || 'idle'
+          const health = queueHealth[stage.key] || 'idle'

Also applies to: 21-29, 95-103

backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx (2)

121-126: Stabilize string sorting (localeCompare) and handle equality.

Prevents inconsistent ordering and treats equal values neutrally.

Apply this diff:

   if (sortDirection === 'asc') {
-    return aValue > bValue ? 1 : -1
+    if (typeof aValue === 'string' && typeof bValue === 'string') return aValue.localeCompare(bValue)
+    return aValue === bValue ? 0 : aValue > bValue ? 1 : -1
   } else {
-    return aValue < bValue ? 1 : -1
+    if (typeof aValue === 'string' && typeof bValue === 'string') return bValue.localeCompare(aValue)
+    return aValue === bValue ? 0 : aValue < bValue ? 1 : -1
   }

30-49: Extract richer error details (Axios response.data.detail).

Improves operator feedback when backend provides structured errors.

Apply this diff:

   } catch (err: any) {
-    setError(err.message || 'Failed to load active tasks')
+    const message = err?.response?.data?.detail || err?.message || 'Failed to load active tasks'
+    setError(message)
   } finally {
backends/advanced/webui/src/pages/Upload.tsx (2)

398-404: Avoid document.querySelector for file input; use a ref.

Reduces coupling and avoids wrong element selection if inputs multiply.

Apply this diff:

-import React, { useState, useCallback, useEffect } from 'react'
+import React, { useState, useCallback, useEffect, useRef } from 'react'
@@
 export default function Upload() {
+  const fileInputRef = useRef<HTMLInputElement>(null)
@@
         <input
           type="file"
           multiple
           accept="audio/*,.wav,.mp3,.m4a,.flac"
           onChange={(e) => handleFileSelect(e.target.files)}
+          ref={fileInputRef}
           className="absolute inset-0 w-full h-full opacity-0 cursor-pointer"
         />
@@
         <button
-          onClick={() => (document.querySelector('input[type="file"]') as HTMLInputElement)?.click()}
+          onClick={() => fileInputRef.current?.click()}
           className="px-4 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700 transition-colors"
         >

Also applies to: 390-396, 1-1


276-291: Multiple job IDs (multi‑file) are collapsed to the first.

If the server returns multiple jobs, only jobs[0] is used. Consider persisting all job_ids to track each file accurately.

Is uploadApi.uploadAudioFilesAsync guaranteed to return a single job_id for multi-file uploads? If not, we should store all job_ids and adjust polling accordingly.

backends/advanced/webui/src/services/api.ts (3)

144-154: Upload with progress: OK; consider DRYing config

This duplicates the multipart+progress config used below. Extracting a small helper keeps both sync/async upload in lockstep (headers, timeout, progress calc).

Example helper (outside this range):

function buildMultipartConfig(onUploadProgress?: (p: number) => void, timeout = 300000) {
  return {
    headers: { 'Content-Type': 'multipart/form-data' },
    timeout,
    onUploadProgress: (e: ProgressEvent) => {
      if (onUploadProgress && e.total) onUploadProgress(Math.round((e.loaded * 100) / e.total))
    }
  }
}

Then:

-    api.post('/api/process-audio-files', files, {
-      headers: { 'Content-Type': 'multipart/form-data' },
-      timeout: 300000, // 5 minutes
-      onUploadProgress: (progressEvent) => {
-        if (onProgress && progressEvent.total) {
-          const progress = Math.round((progressEvent.loaded * 100) / progressEvent.total)
-          onProgress(progress)
-        }
-      }
-    }),
+    api.post('/api/process-audio-files', files, buildMultipartConfig(onProgress, 300000)),

169-175: Batch job status: OK; consider single-call batch API if available

This Promise.all approach is fine. If the backend adds a batch status endpoint later, prefer it to reduce N requests.


156-168: Async upload — endpoint confirmed; centralize multipart config

  • Confirmed: POST /api/process-audio-files-async exists and the backend returns job_id + status_url (/api/process-audio-files/jobs/{job_id}) (see backends/advanced/upload_files.py and backends/advanced/src/advanced_omi_backend/controllers/system_controller.py).
  • I couldn't find a buildMultipartConfig helper; factor out the multipart/form-data request config and reuse it in backends/advanced/webui/src/services/api.ts (uploadAudioFiles / uploadAudioFilesAsync).
backends/advanced/src/advanced_omi_backend/processors.py (4)

432-449: Additions to status payload: good; remove unused import and consider a helper for client type

The added user_id and client_type fields are useful. Minor nits:

  • The import re is unused; remove to keep imports clean.
  • Consider centralizing client-type detection (upload/websocket) in a small helper for reuse/testing.
-        import re
         client_type = "upload" if ("-upload-" in client_id or client_id.startswith("reprocess-")) else "websocket"

584-599: Upload cleanup helper: OK; prefer logger.exception on failure

Swap to exception-level logging to capture stacktrace without passing exc_info manually.

-        except Exception as e:
-            logger.error(f"❌ Error cleaning up upload client state for {client_id}: {e}", exc_info=True)
+        except Exception as e:
+            logger.exception(f"❌ Error cleaning up upload client state for {client_id}: {e}")

700-722: Queue health heuristic: OK as a first pass

Straightforward thresholds; you may want to make them configurable later.


991-1006: Cleanup tick: good idea; prefer logger.exception in failure path

The periodic cleanup is a good addition. Use exception-level logging for the catch to preserve stack traces.

-                    try:
-                        self._cleanup_completed_entries()
-                    except Exception as cleanup_error:
-                        audio_logger.error(f"Error during processing entry cleanup: {cleanup_error}")
+                    try:
+                        self._cleanup_completed_entries()
+                    except Exception as cleanup_error:
+                        audio_logger.exception(f"Error during processing entry cleanup: {cleanup_error}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 641b7a6 and 515fb81.

📒 Files selected for processing (30)
  • CLAUDE.md (6 hunks)
  • backends/advanced/src/advanced_omi_backend/audio_utils.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/client.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (5 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (5 hunks)
  • backends/advanced/src/advanced_omi_backend/conversation_manager.py (3 hunks)
  • backends/advanced/src/advanced_omi_backend/main.py (3 hunks)
  • backends/advanced/src/advanced_omi_backend/memory/memory_service.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/processors.py (4 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/transcript_coordinator.py (0 hunks)
  • backends/advanced/src/advanced_omi_backend/transcription.py (7 hunks)
  • backends/advanced/webui/src/App.tsx (2 hunks)
  • backends/advanced/webui/src/components/layout/Layout.tsx (2 hunks)
  • backends/advanced/webui/src/components/processes/ActiveTasksTable.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ClientDetailModal.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ProcessPipelineView.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1 hunks)
  • backends/advanced/webui/src/components/processes/SystemHealthCards.tsx (1 hunks)
  • backends/advanced/webui/src/pages/Processes.tsx (1 hunks)
  • backends/advanced/webui/src/pages/System.tsx (6 hunks)
  • backends/advanced/webui/src/pages/Upload.tsx (8 hunks)
  • backends/advanced/webui/src/services/api.ts (3 hunks)
  • docs/api-reference.md (1 hunks)
  • docs/distributed-deployment.md (1 hunks)
  • docs/memory-providers.md (1 hunks)
  • docs/speaker-recognition.md (1 hunks)
  • docs/versioned-processing.md (1 hunks)
  • docs/wyoming-protocol.md (1 hunks)
  • extras/speaker-recognition/sortformer.py (1 hunks)
💤 Files with no reviewable changes (1)
  • backends/advanced/src/advanced_omi_backend/transcript_coordinator.py
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
📚 Learning: 2025-09-18T01:42:08.562Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management

Applied to files:

  • backends/advanced/src/advanced_omi_backend/client.py
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
  • backends/advanced/src/advanced_omi_backend/transcription.py
  • backends/advanced/src/advanced_omi_backend/conversation_manager.py
🪛 Ruff (0.13.1)
backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py

172-172: Unused function argument: current_user

(ARG001)


172-172: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


180-180: Unused function argument: current_user

(ARG001)


180-180: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


188-188: Unused function argument: current_user

(ARG001)


188-188: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

backends/advanced/src/advanced_omi_backend/processors.py

522-522: Store a reference to the return value of asyncio.create_task

(RUF006)


523-523: Do not catch blind exception: Exception

(BLE001)


524-524: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


548-548: Loop control variable stage_name not used within loop body

Rename unused stage_name to _stage_name

(B007)


563-563: Store a reference to the return value of asyncio.create_task

(RUF006)


564-564: Do not catch blind exception: Exception

(BLE001)


565-565: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


572-572: Do not catch blind exception: Exception

(BLE001)


573-573: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


581-581: Do not catch blind exception: Exception

(BLE001)


582-582: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


640-640: Loop control variable client_id not used within loop body

Rename unused client_id to _client_id

(B007)


696-696: Do not catch blind exception: Exception

(BLE001)


697-697: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1004-1004: Do not catch blind exception: Exception

(BLE001)


1005-1005: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

extras/speaker-recognition/sortformer.py

1-1: Shebang is present but file is not executable

(EXE001)


30-30: Do not catch blind exception: Exception

(BLE001)


30-30: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


61-61: Do not catch blind exception: Exception

(BLE001)


109-109: Unpacked variable sr is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


153-153: Do not use bare except

(E722)


153-154: try-except-pass detected, consider logging the exception

(S110)


168-168: Do not use bare except

(E722)


168-169: try-except-pass detected, consider logging the exception

(S110)


253-253: Loop control variable category not used within loop body

Rename unused category to _category

(B007)


275-275: Do not catch blind exception: Exception

(BLE001)


296-296: Do not catch blind exception: Exception

(BLE001)


335-335: f-string without any placeholders

Remove extraneous f prefix

(F541)

backends/advanced/src/advanced_omi_backend/audio_utils.py

124-124: Abstract raise to an inner function

(TRY301)


124-124: Avoid specifying long messages outside the exception class

(TRY003)


128-128: Abstract raise to an inner function

(TRY301)


128-128: Avoid specifying long messages outside the exception class

(TRY003)


130-130: Abstract raise to an inner function

(TRY301)


130-130: Avoid specifying long messages outside the exception class

(TRY003)


132-132: Abstract raise to an inner function

(TRY301)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


149-149: Do not catch blind exception: Exception

(BLE001)


150-150: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


151-151: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


151-151: Avoid specifying long messages outside the exception class

(TRY003)

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py

639-639: Do not catch blind exception: Exception

(BLE001)


640-640: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


642-642: Use explicit conversion flag

Replace with conversion flag

(RUF010)


752-752: Do not catch blind exception: Exception

(BLE001)


753-753: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


755-755: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/src/advanced_omi_backend/main.py

278-278: Do not catch blind exception: Exception

(BLE001)


279-279: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backends/advanced/src/advanced_omi_backend/transcription.py

216-216: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


216-216: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py

545-545: Local variable processed_files is assigned to but never used

Remove assignment to unused variable processed_files

(F841)


755-755: Do not catch blind exception: Exception

(BLE001)


756-758: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


767-767: Do not catch blind exception: Exception

(BLE001)


768-768: Use explicit conversion flag

Replace with conversion flag

(RUF010)


769-769: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1167-1172: Consider moving this statement to an else block

(TRY300)


1174-1174: Do not catch blind exception: Exception

(BLE001)


1175-1175: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1177-1177: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1210-1210: Consider moving this statement to an else block

(TRY300)


1211-1211: Do not catch blind exception: Exception

(BLE001)


1212-1212: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1214-1214: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1241-1241: Do not catch blind exception: Exception

(BLE001)


1242-1242: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1244-1244: Use explicit conversion flag

Replace with conversion flag

(RUF010)


1296-1296: Consider moving this statement to an else block

(TRY300)


1297-1297: Do not catch blind exception: Exception

(BLE001)


1298-1298: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1300-1300: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/src/advanced_omi_backend/conversation_manager.py

30-30: Unused method argument: transcript_data

(ARG002)


77-77: Consider moving this statement to an else block

(TRY300)


186-186: Do not catch blind exception: Exception

(BLE001)


241-241: Do not catch blind exception: Exception

(BLE001)


347-347: Consider moving this statement to an else block

(TRY300)

🪛 markdownlint-cli2 (0.18.1)
docs/wyoming-protocol.md

7-7: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (22)
docs/memory-providers.md (1)

1-182: Docs LGTM.

Clear provider comparison and setup steps align with the new processing architecture.

backends/advanced/src/advanced_omi_backend/client.py (2)

96-96: Docstring tweak LGTM.

Matches the new centralized processing flow.


121-131: No change required — reprocessing uses conversation_id; close_conversation correctly accepts audio_uuid.
Reprocessing endpoints use conversation_id; controllers map conversation_id → audio_uuid for file access and ConversationManager.close_conversation(audio_uuid=...) is the intended API.

docs/distributed-deployment.md (1)

1-178: Docs LGTM.

Good, actionable distributed setup with Tailscale and service splits.

docs/versioned-processing.md (1)

1-166: Versioning design doc LGTM — verified.

Reprocessing endpoints and conversation_id usage are present and consistent across the codebase (routers/modules/conversation_routes.py, controllers/conversation_controller.py, database.py, processors.py, transcription.py, conversation_manager.py).

backends/advanced/src/advanced_omi_backend/audio_utils.py (1)

9-12: Resolved — numpy is declared in the project dependencies

Found numpy entries in pyproject/requirements and Dockerfiles (e.g. "numpy>=1.26", "numpy>=1.26,<2.0", "numpy>=1.21.0"); imports exist under backends/advanced/src — no change required.

CLAUDE.md (1)

105-106: Nit: confirm service name parakeet-asr.

If the compose service is named differently (e.g., parakeet), this command will fail. Verify docker-compose service name alignment.

backends/advanced/src/advanced_omi_backend/conversation_manager.py (1)

305-345: LGTM: conversation_id-first flow aligns with versioned processing.

Creation + transcript version activation + speaker-aware title/summary fits the “use conversation_id for reprocessing/version mgmt” guideline.

backends/advanced/src/advanced_omi_backend/transcription.py (1)

460-468: LGTM: centralize conversation lifecycle via ConversationManager.

Using conversation_id for subsequent processing (e.g., memory) is the right direction.

backends/advanced/webui/src/components/layout/Layout.tsx (1)

19-21: Admin nav item for Processes: LGTM

Matches new route and admin gating. Icon choice fits.

backends/advanced/webui/src/App.tsx (1)

72-76: Processes route integration: LGTM

New admin page is properly routed within protected layout and error boundary.

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (1)

234-241: Consistency: Reprocessing IDs should be conversation_id per guideline

This endpoint takes audio_uuid. If cropping reprocess participates in versioning or audit, consider accepting conversation_id and resolving audio_uuid internally.

Would you like me to draft a companion handler reprocess_audio_cropping_by_conversation(conversation_id) to align with the reprocessing model?

backends/advanced/webui/src/components/processes/ProcessingHistory.tsx (1)

21-28: Component behavior: LGTM

Initial data + refreshTrigger flow makes sense; pagination wired correctly.

Also applies to: 45-56

backends/advanced/webui/src/pages/System.tsx (2)

218-255: Service status block: LGTM

Clear, full-width rendering with provider/status hints improves UX.


480-507: Debug metrics section: LGTM

Good optional rendering and clear KPIs.

backends/advanced/webui/src/pages/Processes.tsx (1)

152-174: Processes dashboard composition: LGTM

Health, pipeline, active tasks, and history integrate cleanly; refreshTrigger wiring is consistent.

docs/speaker-recognition.md (1)

27-35: Verified: referenced paths/endpoints and dependency exist. extras/speaker-recognition/src/simple_speaker_recognition/utils/analysis.py exists; web UI at extras/speaker-recognition/webui/ exists; /speakers/analysis route implemented in extras/speaker-recognition/src/simple_speaker_recognition/api/routers/speakers.py; umap-learn is listed in extras/speaker-recognition/pyproject.toml as "umap-learn>=0.5.3" (minimum-version constraint, not an exact pin).

backends/advanced/webui/src/services/api.ts (4)

134-134: LGTM: memory config reload API

No issues spotted with the reload endpoint wiring.


229-230: No functional changes here

Nothing to flag.


231-231: Good doc note on upload session API removal

Clear guidance for consumers.


135-141: Verified — backend routes & pagination names match

Backend defines /processor/overview, /processor/history (Query params: page, per_page) and /processor/clients/{client_id} — see backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py and backends/advanced/src/advanced_omi_backend/controllers/system_controller.py which returns pagination with page/per_page.

backends/advanced/src/advanced_omi_backend/processors.py (1)

600-621: Centralized removal: LGTM

Clear, cohesive cleanup for both tracking dicts.

Comment on lines +116 to +125
# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix stereo→mono conversion: current code writes float bytes, corrupting PCM.

np.mean(..., dtype=np.int16).tobytes() yields float array bytes, not int16 PCM. Use integer accumulate/average, clip, and cast to int16 before .tobytes().

Apply this diff:

-            # Convert to mono if stereo (same logic as system_controller.py)
+            # Convert to mono if stereo (same logic as system_controller.py)
             if channels == 2:
                 if sample_width == 2:
-                    audio_array = np.frombuffer(audio_data, dtype=np.int16)
-                    audio_array = audio_array.reshape(-1, 2)
-                    audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
-                    channels = 1
+                    audio_array = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2)
+                    # Average L/R safely in wider int, then clip back to int16
+                    mono = (audio_array.astype(np.int32).sum(axis=1) // 2)
+                    mono = np.clip(mono, -32768, 32767).astype(np.int16)
+                    audio_data = mono.tobytes()
+                    channels = 1
                 else:
                     raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2)
# Average L/R safely in wider int, then clip back to int16
mono = (audio_array.astype(np.int32).sum(axis=1) // 2)
mono = np.clip(mono, -32768, 32767).astype(np.int16)
audio_data = mono.tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
🧰 Tools
🪛 Ruff (0.13.1)

124-124: Abstract raise to an inner function

(TRY301)


124-124: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/audio_utils.py around lines 116 to
125, the stereo→mono conversion uses np.mean with dtype=np.int16 then
.tobytes(), which produces float bytes and corrupts PCM; replace it by loading
samples as int16, reshape to (-1,2), compute integer-safe mean by summing with a
wider integer dtype (e.g., int32), dividing by 2 with proper rounding, clipping
to int16 range, casting back to np.int16, and then call .tobytes(); keep
channels=1 after conversion and raise the same error for unsupported widths.

Comment on lines +739 to +745
memory_item = MemoryProcessingItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
user_email=user_obj.email,
conversation_id=conversation_id
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Bug: transcript_version_id is not passed to memory reprocess job

Queue item lacks transcript_version_id, so the worker may process the wrong transcript version.

Apply:

             memory_item = MemoryProcessingItem(
                 client_id=f"reprocess-{conversation_id}",
                 user_id=str(user.user_id),
                 user_email=user_obj.email,
-                conversation_id=conversation_id
+                conversation_id=conversation_id,
+                transcript_version_id=transcript_version_id
             )

🏁 Script executed:

#!/bin/bash
# Verify MemoryProcessingItem supports transcript_version_id, and where it is consumed.
rg -nP --type=py -C3 '\bclass\s+MemoryProcessingItem\b|@dataclass'
rg -nP --type=py -C3 '\bMemoryProcessingItem\('
rg -nP --type=py -C5 '\bdef\s+queue_memory\b'

Length of output: 9376


🏁 Script executed:

#!/bin/bash
set -euo pipefail

proc_file="backends/advanced/src/advanced_omi_backend/processors.py"
trans_file="backends/advanced/src/advanced_omi_backend/transcription.py"
conv_file="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"

echo "=== processors.py: MemoryProcessingItem area ==="
line=$(rg -nP --line-number 'class\s+MemoryProcessingItem' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line" ]; then
  echo "MemoryProcessingItem class not found in $proc_file"
else
  start=$(( line - 10 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line + 60 ))
  sed -n "${start},${end}p" "$proc_file" || true
fi

echo
echo "=== processors.py: queue_memory function ==="
line_q=$(rg -nP --line-number 'def\s+queue_memory\s*\(' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_q" ]; then
  echo "queue_memory not found in $proc_file"
else
  start=$(( line_q - 8 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line_q + 80 ))
  sed -n "${start},${end}p" "$proc_file" || true
fi

echo
echo "=== transcription.py: MemoryProcessingItem instantiation area ==="
line_t=$(rg -nP --line-number 'MemoryProcessingItem\(' "$trans_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_t" ]; then
  echo "MemoryProcessingItem instantiation not found in $trans_file"
else
  start=$(( line_t - 8 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line_t + 40 ))
  sed -n "${start},${end}p" "$trans_file" || true
fi

echo
echo "=== conversation_controller.py: reprocess area ==="
sed -n '720,760p' "$conv_file" || true

Length of output: 10658


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== search for transcript_version_id ==="
rg -n --hidden --no-ignore --type=py 'transcript_version_id' || true

echo
echo "=== search for transcript_version (broader) ==="
rg -n --hidden --no-ignore --type=py 'transcript_version' || true

echo
echo "=== search for attribute access .transcript_version_id ==="
rg -n --hidden --no-ignore --type=py '\.transcript_version_id\b' || true

echo
echo "=== search for MemoryProcessingItem instantiations ==="
rg -n --hidden --no-ignore --type=py 'MemoryProcessingItem\(' || true

echo
echo "=== search for memory queue consumer / processor ==="
rg -n --hidden --no-ignore --type=py 'memory_queue' || true
rg -n --hidden --no-ignore --type=py 'memory_processor' || true
rg -n --hidden --no-ignore --type=py 'process_memory' || true
rg -n --hidden --no-ignore --type=py 'def\s+.*memory.*\(' -n --pcre2 || true

Length of output: 23917


🏁 Script executed:

#!/bin/bash
set -euo pipefail

proc="backends/advanced/src/advanced_omi_backend/processors.py"
mem_service="backends/advanced/src/advanced_omi_backend/memory/memory_service.py"

echo "=== processors.py: _memory_processor and _process_memory_item ==="
rg -n --line-number 'async def _memory_processor\(|async def _process_memory_item\(|async def _should_process_memory\(' "$proc" || true
start_line=$(rg -n --line-number 'async def _memory_processor\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start_line" ]; then
  sed -n "$((start_line-6)),$((start_line+120))p" "$proc"
fi

echo
echo "=== processors.py: _process_memory_item exact area ==="
line_proc=$(rg -n --line-number 'async def _process_memory_item\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_proc" ]; then
  sed -n "$((line_proc-4)),$((line_proc+200))p" "$proc"
fi

echo
echo "=== memory_service.py: functions referencing transcript_version_id ==="
rg -n --line-number 'transcript_version_id' "$mem_service" || true
rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" || true
line_ms=$(rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_ms" ]; then
  sed -n "$((line_ms-6)),$((line_ms+120))p" "$mem_service"
fi

echo
echo "=== memory_service.py: top area ==="
sed -n '1,240p' "$mem_service" || true

Length of output: 30988


🏁 Script executed:

#!/bin/bash
set -euo pipefail

cc="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"
db="backends/advanced/src/advanced_omi_backend/database.py"

echo "=== conversation_controller.reprocess_memory ==="
rg -n --line-number 'async def reprocess_memory\(' "$cc" || true
start=$(rg -n --line-number 'async def reprocess_memory\(' "$cc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start" ]; then
  sed -n "$((start-4)),$((start+240))p" "$cc" || true
fi

echo
echo "=== database.create_transcript_version ==="
rg -n --line-number 'def\s+create_transcript_version\(' "$db" || true
startdb=$(rg -n --line-number 'def\s+create_transcript_version\(' "$db" | head -n1 | cut -d: -f1 || true)
if [ -n "$startdb" ]; then
  sed -n "$((startdb-4)),$((startdb+180))p" "$db" || true
fi

echo
echo "=== search for ConversationsRepository class/definition ==="
rg -n --hidden --no-ignore --type=py 'class\s+ConversationsRepository\b' || true
rg -n --hidden --no-ignore --type=py 'def\s+get_conversation\(' || true
rg -n --hidden --no-ignore --type=py 'class\s+Conversation' || true

Length of output: 16938


Critical: pass transcript_version_id through the memory queue and use it in the worker

Memory reprocess currently creates a memory_version tied to a specific transcript_version_id but queues a MemoryProcessingItem without that id; the memory worker reads conversation["transcript"] (the active transcript) and can therefore process the wrong version.

  • Add transcript_version_id: Optional[str] = None to MemoryProcessingItem in backends/advanced/src/advanced_omi_backend/processors.py (import Optional).
  • Update _process_memory_item in the same file to prefer item.transcript_version_id: lookup the matching entry in conversation["transcript_versions"] and use its "segments" to build the transcript; fall back to conversation["transcript"] if not found.
  • Pass transcript_version_id when creating the item in backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (and optionally include it when queuing from transcription.py for consistency).
  • Verify end-to-end: queued item contains the version id and the memory worker extracts the correct segments for that version.
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 739-745, the MemoryProcessingItem is queued without the
transcript_version_id causing the memory worker to possibly reprocess the wrong
transcript; add transcript_version_id to the queued item by passing the correct
version id used to create the memory_version. Also update
backends/advanced/src/advanced_omi_backend/processors.py to: (1) add
"transcript_version_id: Optional[str] = None" to the MemoryProcessingItem
dataclass (import Optional), and (2) modify _process_memory_item to prefer
item.transcript_version_id — locate the matching entry in
conversation["transcript_versions"] and use its "segments" to build the
transcript, falling back to conversation["transcript"] if no match; optionally
propagate transcript_version_id when enqueueing from transcription.py for
consistency and verify end-to-end that the queued item contains the id and the
worker extracts the correct segments.

Comment on lines +179 to 186
update_processing_successful = False
if allow_update and fact_memories_text:
memory_logger.info(f"🔍 Allowing update for {source_id}")
created_ids = await self._process_memory_updates(
fact_memories_text, embeddings, user_id, client_id, source_id, user_email
)
update_processing_successful = True
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update-path success flag can mask failures

update_processing_successful is set True unconditionally after _process_memory_updates, so upstream sees success even if LLM/action proposal failed and no actions were actually processed.

Refactor to return an explicit “processed_ok” from _process_memory_updates:

-            update_processing_successful = False
+            update_processing_successful = False
             if allow_update and fact_memories_text:
                 memory_logger.info(f"🔍 Allowing update for {source_id}")
-                created_ids = await self._process_memory_updates(
-                    fact_memories_text, embeddings, user_id, client_id, source_id, user_email
-                )
-                update_processing_successful = True
+                created_ids, update_processing_successful = await self._process_memory_updates(
+                    fact_memories_text, embeddings, user_id, client_id, source_id, user_email
+                )

And adjust success branch:

-            elif update_processing_successful:
+            elif update_processing_successful:
                 memory_logger.info(f"✅ Memory update processing completed for {source_id} - LLM decided no changes needed")
                 return True, []

Then change _process_memory_updates to return (created_ids, processed_ok) (see next comment).

Also applies to: 202-211

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/memory/memory_service.py around
lines 179-186 (and similarly update the branch at 202-211), the code
unconditionally sets update_processing_successful = True after calling
_process_memory_updates, which can mask failures; change the call to capture a
returned processed_ok flag (e.g. created_ids, processed_ok = await
self._process_memory_updates(...)) and set update_processing_successful =
processed_ok (or handle both values appropriately) and update the success branch
to check processed_ok; also modify _process_memory_updates to return a tuple
(created_ids, processed_ok) so the caller can reliably know whether updates were
actually processed.

Comment on lines +461 to +491
def _is_stale(self, client_id: str, max_idle_minutes: int = 30) -> bool:
"""Check if a processing entry is stale (no activity for specified time).

Args:
client_id: Client ID to check
max_idle_minutes: Maximum idle time in minutes before considering stale

Returns:
True if the entry is stale and should be cleaned up
"""
import time

max_idle_seconds = max_idle_minutes * 60
current_time = time.time()

# Check processing_state timestamps
if client_id in self.processing_state:
client_state = self.processing_state[client_id]
# Find the most recent timestamp across all stages
latest_timestamp = 0
for stage_info in client_state.values():
if isinstance(stage_info, dict) and "timestamp" in stage_info:
latest_timestamp = max(latest_timestamp, stage_info["timestamp"])

if latest_timestamp > 0:
idle_time = current_time - latest_timestamp
return idle_time > max_idle_seconds

# If no processing_state or no valid timestamps, consider it stale
return True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Stale detection can aggressively delete fresh entries without timestamps

If a client has processing_tasks but no timestamps in processing_state yet, this returns True (stale) and may purge active entries. Guard against that.

Apply this to avoid premature cleanup when tasks are still tracked:

-        # If no processing_state or no valid timestamps, consider it stale
-        return True
+        # If no valid timestamps, consider non-stale when tasks are still tracked; otherwise stale
+        if client_id in self.processing_tasks and self.processing_tasks.get(client_id):
+            return False
+        return True
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _is_stale(self, client_id: str, max_idle_minutes: int = 30) -> bool:
"""Check if a processing entry is stale (no activity for specified time).
Args:
client_id: Client ID to check
max_idle_minutes: Maximum idle time in minutes before considering stale
Returns:
True if the entry is stale and should be cleaned up
"""
import time
max_idle_seconds = max_idle_minutes * 60
current_time = time.time()
# Check processing_state timestamps
if client_id in self.processing_state:
client_state = self.processing_state[client_id]
# Find the most recent timestamp across all stages
latest_timestamp = 0
for stage_info in client_state.values():
if isinstance(stage_info, dict) and "timestamp" in stage_info:
latest_timestamp = max(latest_timestamp, stage_info["timestamp"])
if latest_timestamp > 0:
idle_time = current_time - latest_timestamp
return idle_time > max_idle_seconds
# If no processing_state or no valid timestamps, consider it stale
return True
def _is_stale(self, client_id: str, max_idle_minutes: int = 30) -> bool:
"""Check if a processing entry is stale (no activity for specified time).
Args:
client_id: Client ID to check
max_idle_minutes: Maximum idle time in minutes before considering stale
Returns:
True if the entry is stale and should be cleaned up
"""
import time
max_idle_seconds = max_idle_minutes * 60
current_time = time.time()
# Check processing_state timestamps
if client_id in self.processing_state:
client_state = self.processing_state[client_id]
# Find the most recent timestamp across all stages
latest_timestamp = 0
for stage_info in client_state.values():
if isinstance(stage_info, dict) and "timestamp" in stage_info:
latest_timestamp = max(latest_timestamp, stage_info["timestamp"])
if latest_timestamp > 0:
idle_time = current_time - latest_timestamp
return idle_time > max_idle_seconds
# If no valid timestamps, consider non-stale when tasks are still tracked; otherwise stale
if client_id in self.processing_tasks and self.processing_tasks.get(client_id):
return False
return True

Comment on lines +492 to +583
def _cleanup_completed_entries(self):
"""Clean up completed and stale processing entries independently of client lifecycle.

This method is called from existing processor timeout handlers to maintain
clean processing state without affecting active client sessions.
"""
import time

clients_to_remove = []
current_time = time.time()

for client_id in list(self.processing_state.keys()):
try:
status = self.get_processing_status(client_id)

# Clean up if processing is complete OR if upload client is done (even with failed stages)
client_type = status.get("client_type", "websocket")

if status.get("status") == "complete":
if client_type == "upload":
# Upload clients: Clean up immediately when processing completes
clients_to_remove.append((client_id, "completed_upload"))
logger.info(f"Marking completed upload client for immediate cleanup: {client_id}")

# Also trigger client state cleanup for upload clients
try:
from advanced_omi_backend.main import cleanup_client_state
import asyncio

# Schedule client cleanup
asyncio.create_task(self._cleanup_upload_client_state(client_id))
except Exception as cleanup_error:
logger.error(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")
else:
# WebSocket clients: Wait for grace period before cleanup
completion_grace_period = 300 # 5 minutes

# Check if all stages have been complete for grace period
all_stages_old_enough = True
for stage_info in status.get("stages", {}).values():
if "timestamp" in stage_info:
stage_age = current_time - stage_info["timestamp"]
if stage_age < completion_grace_period:
all_stages_old_enough = False
break

if all_stages_old_enough:
clients_to_remove.append((client_id, "completed_websocket"))
logger.info(f"Marking completed WebSocket client for cleanup: {client_id}")

elif client_type == "upload" and status.get("status") == "processing":
# Upload clients: Also clean up if they're done processing (even with failed stages)
# Check if all stages are either completed or have failed (i.e., no longer actively processing)
stages = status.get("stages", {})
all_stages_done = True

for stage_name, stage_info in stages.items():
if not stage_info.get("completed", False) and stage_info.get("status") not in ["failed", "completed"]:
all_stages_done = False
break

if all_stages_done:
clients_to_remove.append((client_id, "finished_upload"))
logger.info(f"Marking finished upload client for cleanup: {client_id} (some stages may have failed)")

# Also trigger client state cleanup for upload clients
try:
from advanced_omi_backend.main import cleanup_client_state
import asyncio

# Schedule client cleanup
asyncio.create_task(self._cleanup_upload_client_state(client_id))
except Exception as cleanup_error:
logger.error(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")

# Clean up if stale (no activity for 30+ minutes)
elif self._is_stale(client_id, max_idle_minutes=30):
clients_to_remove.append((client_id, "stale"))
logger.info(f"Marking stale processing entry for cleanup: {client_id}")

except Exception as e:
logger.error(f"Error checking processing status for {client_id}: {e}")
# If we can't check status, consider it for cleanup
clients_to_remove.append((client_id, "error"))

# Remove the identified entries
for client_id, reason in clients_to_remove:
try:
self._remove_processing_entry(client_id, reason)
except Exception as e:
logger.error(f"Error removing processing entry for {client_id}: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

WebSocket grace-period cleanup logic is flawed and can clean up immediately; also track spawned tasks and use logger.exception

  • If stages come from task-tracking, they lack "timestamp", making all_stages_old_enough stay True and causing immediate cleanup of WebSocket clients (grace period bypassed).
  • Store a reference to create_task (RUF006) and optionally track it.
  • Prefer logger.exception over error(..., exc_info=True). Remove unused import in this block.

Apply these fixes:

-                        # Also trigger client state cleanup for upload clients
-                        try:
-                            from advanced_omi_backend.main import cleanup_client_state
-                            import asyncio
-
-                            # Schedule client cleanup
-                            asyncio.create_task(self._cleanup_upload_client_state(client_id))
-                        except Exception as cleanup_error:
-                            logger.error(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")
+                        # Also trigger client state cleanup for upload clients
+                        try:
+                            # Schedule client cleanup and keep a reference
+                            cleanup_task = asyncio.create_task(self._cleanup_upload_client_state(client_id))
+                            self.task_manager.track_task(
+                                cleanup_task, f"cleanup_upload_{client_id}", {"type": "maintenance"}
+                            )
+                        except Exception as cleanup_error:
+                            logger.exception(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")
...
-                        # Check if all stages have been complete for grace period
-                        all_stages_old_enough = True
-                        for stage_info in status.get("stages", {}).values():
-                            if "timestamp" in stage_info:
-                                stage_age = current_time - stage_info["timestamp"]
-                                if stage_age < completion_grace_period:
-                                    all_stages_old_enough = False
-                                    break
+                        # Check if all stages have been complete for at least the grace period.
+                        # Prefer task-tracked completion timestamps; fallback to state timestamps.
+                        all_stages_old_enough = True
+                        for stage_info in status.get("stages", {}).values():
+                            completed_at = stage_info.get("completed_at")
+                            ts = stage_info.get("timestamp")
+                            if completed_at is None and ts is None:
+                                # Without timing info, do not clean up yet
+                                all_stages_old_enough = False
+                                break
+                            ref_time = completed_at if isinstance(completed_at, (int, float)) else ts
+                            stage_age = current_time - ref_time
+                            if stage_age < completion_grace_period:
+                                all_stages_old_enough = False
+                                break
...
-                    for stage_name, stage_info in stages.items():
+                    for _stage_name, stage_info in stages.items():
                         if not stage_info.get("completed", False) and stage_info.get("status") not in ["failed", "completed"]:
                             all_stages_done = False
                             break
...
-                        # Also trigger client state cleanup for upload clients
-                        try:
-                            from advanced_omi_backend.main import cleanup_client_state
-                            import asyncio
-
-                            # Schedule client cleanup
-                            asyncio.create_task(self._cleanup_upload_client_state(client_id))
-                        except Exception as cleanup_error:
-                            logger.error(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")
+                        # Also trigger client state cleanup for upload clients
+                        try:
+                            cleanup_task = asyncio.create_task(self._cleanup_upload_client_state(client_id))
+                            self.task_manager.track_task(
+                                cleanup_task, f"cleanup_upload_{client_id}", {"type": "maintenance"}
+                            )
+                        except Exception as cleanup_error:
+                            logger.exception(f"Error scheduling upload client cleanup for {client_id}: {cleanup_error}")
...
-            except Exception as e:
-                logger.error(f"Error checking processing status for {client_id}: {e}")
+            except Exception as e:
+                logger.exception(f"Error checking processing status for {client_id}: {e}")
...
-            try:
-                self._remove_processing_entry(client_id, reason)
-            except Exception as e:
-                logger.error(f"Error removing processing entry for {client_id}: {e}")
+            try:
+                self._remove_processing_entry(client_id, reason)
+            except Exception as e:
+                logger.exception(f"Error removing processing entry for {client_id}: {e}")

Additionally, consider keeping a set attribute like self._bg_tasks: set[asyncio.Task] to retain and clean up background tasks. I can provide that wiring if you want it now.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.13.1)

522-522: Store a reference to the return value of asyncio.create_task

(RUF006)


523-523: Do not catch blind exception: Exception

(BLE001)


524-524: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


548-548: Loop control variable stage_name not used within loop body

Rename unused stage_name to _stage_name

(B007)


563-563: Store a reference to the return value of asyncio.create_task

(RUF006)


564-564: Do not catch blind exception: Exception

(BLE001)


565-565: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


572-572: Do not catch blind exception: Exception

(BLE001)


573-573: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


581-581: Do not catch blind exception: Exception

(BLE001)


582-582: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
backends/advanced/src/advanced_omi_backend/processors.py lines 492-583: the
WebSocket grace-period logic treats stages without a "timestamp" as old and can
immediately clean up clients, spawned asyncio tasks are not tracked, and
exception logging should use logger.exception; to fix, require an explicit
timestamp for age checks (i.e., if a stage lacks "timestamp" treat it as not old
enough so all_stages_old_enough becomes False), remove the unused import of
cleanup_client_state in the upload-cleanup blocks, capture the created
asyncio.Task (task = asyncio.create_task(...)) and add it to a background-task
set attribute (e.g., self._bg_tasks, creating it if missing) so tasks are
retained and can be managed, and replace logger.error(... {error}) calls in
these try/except handlers with logger.exception(...) to include traceback.

Comment on lines +628 to +672
def get_pipeline_statistics(self) -> dict[str, Any]:
"""Calculate pipeline performance metrics for each processing stage."""
import time
from statistics import mean

current_time = time.time()

# Calculate stats for each queue
pipeline_stats = {}

# Audio Queue Stats
audio_tasks = []
for client_id, state in self.processing_state.items():
audio_stage = state.get("audio", {})
if audio_stage.get("status") == "completed":
audio_tasks.append({
"duration": audio_stage.get("metadata", {}).get("processing_time", 1.0),
"timestamp": audio_stage.get("timestamp", current_time)
})

pipeline_stats["audio"] = {
"queue_size": self.audio_queue.qsize(),
"active_tasks": sum(1 for state in self.processing_state.values()
if state.get("audio", {}).get("status") == "started"),
"avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
"throughput_per_minute": len([t for t in audio_tasks if current_time - t["timestamp"] < 60])
}

# Similar calculations for other stages
for stage in ["transcription", "memory", "cropping"]:
queue_attr = f"{stage}_queue"
queue = getattr(self, queue_attr, None)

pipeline_stats[stage] = {
"queue_size": queue.qsize() if queue else 0,
"active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
"avg_processing_time_ms": 30000, # Placeholder - can be calculated from task manager history
"success_rate": 0.95, # Placeholder - can be calculated from completed tasks
"throughput_per_minute": 5 # Placeholder
}

return pipeline_stats

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Active task counting is incorrect and can raise on None; fix stage membership and None checks

  • The comprehension checks stage in tid where tid is actually a client_id string, not the stages dict.
  • get_task_info(...) can return None, causing attribute access on None.

Apply:

-            pipeline_stats[stage] = {
-                "queue_size": queue.qsize() if queue else 0,
-                "active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
-                                   if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
-                "avg_processing_time_ms": 30000,  # Placeholder - can be calculated from task manager history
-                "success_rate": 0.95,  # Placeholder - can be calculated from completed tasks
-                "throughput_per_minute": 5  # Placeholder
-            }
+            pipeline_stats[stage] = {
+                "queue_size": queue.qsize() if queue else 0,
+                "active_tasks": sum(
+                    1
+                    for _cid, stages_dict in self.processing_tasks.items()
+                    if stages_dict.get(stage)
+                    and (lambda ti: ti is not None and ti.completed_at is None)(
+                        self.task_manager.get_task_info(stages_dict[stage])
+                    )
+                ),
+                "avg_processing_time_ms": 30000,  # Placeholder - can be calculated from task manager history
+                "success_rate": 0.95,  # Placeholder - can be calculated from completed tasks
+                "throughput_per_minute": 5,  # Placeholder
+            }

Optional: the audio success_rate currently always evaluates to 1.0 (dict truthiness). If you want, I can wire it up to actual success flags or task history.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_pipeline_statistics(self) -> dict[str, Any]:
"""Calculate pipeline performance metrics for each processing stage."""
import time
from statistics import mean
current_time = time.time()
# Calculate stats for each queue
pipeline_stats = {}
# Audio Queue Stats
audio_tasks = []
for client_id, state in self.processing_state.items():
audio_stage = state.get("audio", {})
if audio_stage.get("status") == "completed":
audio_tasks.append({
"duration": audio_stage.get("metadata", {}).get("processing_time", 1.0),
"timestamp": audio_stage.get("timestamp", current_time)
})
pipeline_stats["audio"] = {
"queue_size": self.audio_queue.qsize(),
"active_tasks": sum(1 for state in self.processing_state.values()
if state.get("audio", {}).get("status") == "started"),
"avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
"throughput_per_minute": len([t for t in audio_tasks if current_time - t["timestamp"] < 60])
}
# Similar calculations for other stages
for stage in ["transcription", "memory", "cropping"]:
queue_attr = f"{stage}_queue"
queue = getattr(self, queue_attr, None)
pipeline_stats[stage] = {
"queue_size": queue.qsize() if queue else 0,
"active_tasks": len([tid for tid, tinfo in self.processing_tasks.items()
if stage in tid and not self.task_manager.get_task_info(tinfo.get(stage, "")).completed_at]),
"avg_processing_time_ms": 30000, # Placeholder - can be calculated from task manager history
"success_rate": 0.95, # Placeholder - can be calculated from completed tasks
"throughput_per_minute": 5 # Placeholder
}
return pipeline_stats
def get_pipeline_statistics(self) -> dict[str, Any]:
"""Calculate pipeline performance metrics for each processing stage."""
import time
from statistics import mean
current_time = time.time()
# Calculate stats for each queue
pipeline_stats = {}
# Audio Queue Stats
audio_tasks = []
for client_id, state in self.processing_state.items():
audio_stage = state.get("audio", {})
if audio_stage.get("status") == "completed":
audio_tasks.append({
"duration": audio_stage.get("metadata", {}).get("processing_time", 1.0),
"timestamp": audio_stage.get("timestamp", current_time)
})
pipeline_stats["audio"] = {
"queue_size": self.audio_queue.qsize(),
"active_tasks": sum(1 for state in self.processing_state.values()
if state.get("audio", {}).get("status") == "started"),
"avg_processing_time_ms": mean([t["duration"] * 1000 for t in audio_tasks[-50:]]) if audio_tasks else 0,
"success_rate": len([t for t in audio_tasks[-100:] if t]) / max(len(audio_tasks[-100:]), 1),
"throughput_per_minute": len([t for t in audio_tasks if current_time - t["timestamp"] < 60])
}
# Similar calculations for other stages
for stage in ["transcription", "memory", "cropping"]:
queue_attr = f"{stage}_queue"
queue = getattr(self, queue_attr, None)
pipeline_stats[stage] = {
"queue_size": queue.qsize() if queue else 0,
"active_tasks": sum(
1
for _cid, stages_dict in self.processing_tasks.items()
if stages_dict.get(stage)
and (lambda ti: ti is not None and ti.completed_at is None)(
self.task_manager.get_task_info(stages_dict[stage])
)
),
"avg_processing_time_ms": 30000, # Placeholder - can be calculated from task manager history
"success_rate": 0.95, # Placeholder - can be calculated from completed tasks
"throughput_per_minute": 5, # Placeholder
}
return pipeline_stats
🧰 Tools
🪛 Ruff (0.13.1)

640-640: Loop control variable client_id not used within loop body

Rename unused client_id to _client_id

(B007)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py around lines
628-672, the active task counting iterates over task IDs and checks "stage in
tid" (tid is a client_id string) and then calls .completed_at on a possibly None
task info; change the active_tasks calculation to iterate over
self.processing_tasks.values() (tinfo objects), check the stage by inspecting
tinfo.get(stage) or tinfo.get(stage, {}).get("status") == "started", and when
using self.task_manager.get_task_info(...) guard against None before accessing
.completed_at (e.g., retrieve info into a variable and check if info is not None
and not info.completed_at). Also ensure queue attribute access uses getattr
safely (already present) and keep avg/success/throughput placeholders or wire
them to real task history if desired.

Comment on lines +129 to +139
// Resume session on page load
useEffect(() => {
const session = getStoredSession()
if (session) {
setProcessingPhase('active')
setIsPolling(true)
// Use unified polling without session dependency
pollProcessingStatus()
}
}, [])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Resume flow doesn’t populate jobStatus; Phase 3 UI won’t render on reload.

On reload, you set active polling but never fetch legacy job status, so Phase 3 content (which requires jobStatus) stays hidden.

Apply this diff:

 // Resume session on page load
 useEffect(() => {
   const session = getStoredSession()
   if (session) {
     setProcessingPhase('active')
     setIsPolling(true)
-    // Use unified polling without session dependency
-    pollProcessingStatus()
+    // Prefer job polling to populate UI; fallback to unified polling
+    pollJobStatus(session.job_id).catch(() => {
+      pollProcessingStatus()
+    })
   }
 }, [])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Resume session on page load
useEffect(() => {
const session = getStoredSession()
if (session) {
setProcessingPhase('active')
setIsPolling(true)
// Use unified polling without session dependency
pollProcessingStatus()
}
}, [])
// Resume session on page load
useEffect(() => {
const session = getStoredSession()
if (session) {
setProcessingPhase('active')
setIsPolling(true)
// Prefer job polling to populate UI; fallback to unified polling
pollJobStatus(session.job_id).catch(() => {
pollProcessingStatus()
})
}
}, [])
🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Upload.tsx around lines 129 to 139, the
resume-on-load path starts polling but never retrieves or sets the legacy
jobStatus, so Phase 3 UI never renders; update the useEffect to fetch the
current job status for the stored session (call the existing legacy job status
fetcher or API function), await its result and set jobStatus state (and any
related flags) before enabling polling or setting processingPhase to 'active',
and handle errors (set jobStatus to null and log) so the UI can correctly render
after reload.

Comment on lines +158 to +171
// Filter for upload clients (identified by client_id pattern ending with 3-digit numbers like "-001", "-002")
const uploadTasks: ProcessingTask[] = Object.entries(allTasks)
.filter(([clientId]) => {
// Upload clients have pattern like: "abc123-upload-001", "abc123-upload-002"
return /.*-upload-\d{3}$/.test(clientId)
})
.map(([clientId, taskData]: [string, any]) => ({
client_id: clientId,
user_id: taskData?.user_id || 'Unknown',
status: taskData?.status || 'processing',
stages: taskData?.stages || {}
}))
.filter(task => Object.keys(task.stages).length > 0) // Only show clients with active processing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Brittle client_id regex for task filtering; prefer job_id correlation.

Relying on “-upload-###” is fragile. Use the persisted job_id (top-level or in stage metadata) when available.

Apply this diff:

-      // Filter for upload clients (identified by client_id pattern ending with 3-digit numbers like "-001", "-002")
-      const uploadTasks: ProcessingTask[] = Object.entries(allTasks)
-        .filter(([clientId]) => {
-          // Upload clients have pattern like: "abc123-upload-001", "abc123-upload-002"
-          return /.*-upload-\d{3}$/.test(clientId)
-        })
+      // Prefer filtering by persisted job_id; fallback to client_id pattern as last resort
+      const session = getStoredSession()
+      const uploadTasks: ProcessingTask[] = Object.entries(allTasks)
+        .filter(([clientId, taskData]: [string, any]) => {
+          if (session) {
+            const matchTop = taskData?.job_id && taskData.job_id === session.job_id
+            const matchStage = Object.values(taskData?.stages || {}).some((s: any) => s?.metadata?.job_id === session.job_id)
+            if (matchTop || matchStage) return true
+          }
+          return /.*-upload-\d{3}$/.test(clientId)
+        })
         .map(([clientId, taskData]: [string, any]) => ({
           client_id: clientId,
           user_id: taskData?.user_id || 'Unknown',
           status: taskData?.status || 'processing',
           stages: taskData?.stages || {}
         }))
         .filter(task => Object.keys(task.stages).length > 0) // Only show clients with active processing
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Filter for upload clients (identified by client_id pattern ending with 3-digit numbers like "-001", "-002")
const uploadTasks: ProcessingTask[] = Object.entries(allTasks)
.filter(([clientId]) => {
// Upload clients have pattern like: "abc123-upload-001", "abc123-upload-002"
return /.*-upload-\d{3}$/.test(clientId)
})
.map(([clientId, taskData]: [string, any]) => ({
client_id: clientId,
user_id: taskData?.user_id || 'Unknown',
status: taskData?.status || 'processing',
stages: taskData?.stages || {}
}))
.filter(task => Object.keys(task.stages).length > 0) // Only show clients with active processing
// Prefer filtering by persisted job_id; fallback to client_id pattern as last resort
const session = getStoredSession()
const uploadTasks: ProcessingTask[] = Object.entries(allTasks)
.filter(([clientId, taskData]: [string, any]) => {
if (session) {
const matchTop = taskData?.job_id && taskData.job_id === session.job_id
const matchStage = Object.values(taskData?.stages || {}).some((s: any) => s?.metadata?.job_id === session.job_id)
if (matchTop || matchStage) return true
}
return /.*-upload-\d{3}$/.test(clientId)
})
.map(([clientId, taskData]: [string, any]) => ({
client_id: clientId,
user_id: taskData?.user_id || 'Unknown',
status: taskData?.status || 'processing',
stages: taskData?.stages || {}
}))
.filter(task => Object.keys(task.stages).length > 0) // Only show clients with active processing
🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Upload.tsx around lines 158 to 171, the
current filter uses a brittle client_id regex to identify upload tasks; instead,
detect upload/processing tasks by checking for a persisted job_id on the task
(e.g., taskData.job_id) or, if absent, by scanning stage metadata for a job_id
field, and only then map those entries into ProcessingTask objects; update the
filter chain to (1) accept entries that have taskData.job_id OR at least one
stage with a job_id, (2) fall back to a looser client_id check only if no job_id
is present, and (3) preserve existing mapping and the final filter that ensures
stages exist.

Comment on lines +174 to +201
const allComplete = uploadTasks.length > 0 && uploadTasks.every(task => task.status === 'complete')
const noActiveTasks = uploadTasks.length === 0 && processingPhase === 'active'

if (allComplete || noActiveTasks) {
setIsPolling(false)
setProcessingPhase('completed')
clearStoredSession()

setFiles(prevFiles =>
prevFiles.map(f => ({
...f,
status: 'success'
}))
)
} else if (uploadTasks.some(task => Object.values(task.stages).some(stage => stage.error))) {
// Check for any errors in processing stages
const hasErrors = uploadTasks.some(task =>
Object.values(task.stages).some(stage => stage.error)
)

if (hasErrors) {
setFiles(prevFiles =>
prevFiles.map(f => ({
...f,
status: 'error',
error: 'Processing failed'
}))
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

“No active tasks” → auto‑success can produce false positives. Validate completion.

Confirm completion via legacy job status (if present) before marking success.

Apply this diff:

-      if (allComplete || noActiveTasks) {
-        setIsPolling(false)
-        setProcessingPhase('completed')
-        clearStoredSession()
-
-        setFiles(prevFiles =>
-          prevFiles.map(f => ({
-            ...f,
-            status: 'success'
-          }))
-        )
+      if (allComplete || noActiveTasks) {
+        const session = getStoredSession()
+        if (session) {
+          try {
+            const resp = await uploadApi.getJobStatus(session.job_id)
+            const status: JobStatus = resp.data
+            setJobStatus(status)
+            if (status.status === 'completed') {
+              setIsPolling(false)
+              setProcessingPhase('completed')
+              clearStoredSession()
+              setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'success' })))
+              return
+            }
+            if (status.status === 'failed') {
+              setIsPolling(false)
+              setProcessingPhase('completed')
+              clearStoredSession()
+              setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'error', error: 'Processing failed' })))
+              return
+            }
+          } catch {
+            // fallback to optimistic completion
+          }
+        }
+        setIsPolling(false)
+        setProcessingPhase('completed')
+        clearStoredSession()
+        setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'success' })))
       } else if (uploadTasks.some(task => Object.values(task.stages).some(stage => stage.error))) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const allComplete = uploadTasks.length > 0 && uploadTasks.every(task => task.status === 'complete')
const noActiveTasks = uploadTasks.length === 0 && processingPhase === 'active'
if (allComplete || noActiveTasks) {
setIsPolling(false)
setProcessingPhase('completed')
clearStoredSession()
setFiles(prevFiles =>
prevFiles.map(f => ({
...f,
status: 'success'
}))
)
} else if (uploadTasks.some(task => Object.values(task.stages).some(stage => stage.error))) {
// Check for any errors in processing stages
const hasErrors = uploadTasks.some(task =>
Object.values(task.stages).some(stage => stage.error)
)
if (hasErrors) {
setFiles(prevFiles =>
prevFiles.map(f => ({
...f,
status: 'error',
error: 'Processing failed'
}))
)
const allComplete = uploadTasks.length > 0 && uploadTasks.every(task => task.status === 'complete')
const noActiveTasks = uploadTasks.length === 0 && processingPhase === 'active'
if (allComplete || noActiveTasks) {
const session = getStoredSession()
if (session) {
try {
const resp = await uploadApi.getJobStatus(session.job_id)
const status: JobStatus = resp.data
setJobStatus(status)
if (status.status === 'completed') {
setIsPolling(false)
setProcessingPhase('completed')
clearStoredSession()
setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'success' })))
return
}
if (status.status === 'failed') {
setIsPolling(false)
setProcessingPhase('completed')
clearStoredSession()
setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'error', error: 'Processing failed' })))
return
}
} catch {
// fallback to optimistic completion
}
}
setIsPolling(false)
setProcessingPhase('completed')
clearStoredSession()
setFiles(prevFiles => prevFiles.map(f => ({ ...f, status: 'success' })))
} else if (uploadTasks.some(task => Object.values(task.stages).some(stage => stage.error))) {
// Check for any errors in processing stages
const hasErrors = uploadTasks.some(task =>
Object.values(task.stages).some(stage => stage.error)
)
if (hasErrors) {
setFiles(prevFiles =>
prevFiles.map(f => ({
...f,
status: 'error',
error: 'Processing failed'
}))
)

@AnkushMalaker AnkushMalaker deleted the reprocess-fix branch December 18, 2025 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant