Skip to content

Added queue management page and linked repreocess transcription#117

Closed
thestumonkey wants to merge 3 commits intoSimpleOpenSoftware:mainfrom
thestumonkey:queue
Closed

Added queue management page and linked repreocess transcription#117
thestumonkey wants to merge 3 commits intoSimpleOpenSoftware:mainfrom
thestumonkey:queue

Conversation

@thestumonkey
Copy link
Contributor

@thestumonkey thestumonkey commented Sep 22, 2025

Added version selection from the conversation page, delete version and the queuemanagement page

Summary by CodeRabbit

  • New Features

    • Queue Management dashboard: monitor jobs, view stats/health, filter/paginate, retry/cancel, and flush (admin).
    • Asynchronous reprocessing for transcripts/memories with queued status and job details.
    • Conversation versioning UI: view/switch transcript and memory versions; reprocess from header.
    • Configurable audio storage paths via environment variables.
  • Improvements

    • Conversations now include version info for better visibility in the UI.
  • API

    • New queue endpoints for jobs, stats, health, and admin flush operations.
  • UI

    • Added “Queue Management” to the admin navigation.

Added version selection from the conversation page, delete version and the queuemanagement page
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds environment-configurable audio storage defaults; introduces a MongoDB-backed simple queue with worker, routing, and UI for monitoring; shifts transcript/memory reprocessing to asynchronous jobs; extends conversation controller with version management and richer payloads; updates repositories with status/version update methods; wires queue startup/shutdown; adds admin navigation and Queue page.

Changes

Cohort / File(s) Summary
Config: Audio storage defaults
backends/advanced/src/advanced_omi_backend/config.py
Adds DEFAULT_AUDIO_STORAGE_SETTINGS and get_audio_storage_settings() to read paths from env with fallback/derivation.
Conversation controller: Versioning and async reprocessing
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
Enhances listing with version_info and debug fields; populates primary fields; adds delete_conversation_version; introduces _do_transcript_reprocessing; reprocess_transcript/memory now enqueue and return QUEUED; expanded logging/error details.
Database repositories: Status/version updates
backends/advanced/src/advanced_omi_backend/database.py
Adds methods to update audio chunk transcription status, update a specific transcript version, and update transcription status on conversations (timestamps, provider, errors).
SimpleQueue: Mongo-backed jobs + worker
backends/advanced/src/advanced_omi_backend/simple_queue.py
New SimpleQueue with enqueue, worker loop, job processing for reprocess_transcript, stats, pagination, and flush operations; singleton accessor and helpers.
App lifecycle: Queue startup/shutdown
backends/advanced/src/advanced_omi_backend/main.py
Optionally initializes simple queue on startup and shuts it down on app teardown; non-blocking with logging.
Routing: Expose queue APIs
backends/advanced/src/advanced_omi_backend/routers/api_router.py, backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py, backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
Adds queue_router export and inclusion; new endpoints: GET /queue/jobs, /stats, /health; POST /queue/flush, /flush-all with admin checks and models.
Web UI: App routes and admin nav
backends/advanced/webui/src/App.tsx, backends/advanced/webui/src/components/layout/Layout.tsx
Registers /queue route and adds “Queue Management” admin nav item.
Web UI: Conversation version UI
backends/advanced/webui/src/components/ConversationVersionDropdown.tsx, backends/advanced/webui/src/components/ConversationVersionHeader.tsx, backends/advanced/webui/src/pages/Conversations.tsx
New components to view/switch active transcript/memory versions, trigger reprocess; Conversation type extended with version_info; spacing tweaks.
Web UI: Queue dashboard
backends/advanced/webui/src/pages/Queue.tsx
New page to list/filter jobs, view stats/details, retry/cancel, and flush jobs with confirmations and pagination.
Web UI: API client
backends/advanced/webui/src/services/api.ts
Adds queueApi with getJobs, getStats, retryJob, cancelJob.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant UI as Web UI
  participant API as FastAPI /queue
  participant CTRL as conversation_controller
  participant Q as SimpleQueue
  participant W as Queue Worker
  participant DB as MongoDB

  UI->>API: POST /api/conversations/{id}/reprocess-transcript
  API->>CTRL: reprocess_transcript(...)
  CTRL->>Q: enqueue_job(type=reprocess_transcript, data={conversation_id,...})
  Q-->>CTRL: job_id
  CTRL-->>UI: 202 QUEUED { job_id, metadata }

  rect rgba(230,245,255,0.5)
    note over W,Q: Background processing
    W->>Q: fetch next QUEUED
    Q-->>W: job
    W->>CTRL: _do_transcript_reprocessing(job)
    CTRL->>DB: update_transcription_status(... PROCESSING ...)
    CTRL->>DB: load audio, run transcription, diarization
    CTRL->>DB: update_transcript_version(..., status=COMPLETED)
    CTRL-->>W: result payload
    W->>DB: mark job COMPLETED with result
  end
Loading
sequenceDiagram
  autonumber
  participant AdminUI as Admin UI
  participant API as FastAPI /queue
  participant Q as SimpleQueue
  participant DB as MongoDB

  AdminUI->>API: GET /api/queue/jobs?filters
  API->>Q: get_jobs(limit, offset, filters)
  Q->>DB: query simple_jobs
  DB-->>Q: jobs + total
  Q-->>API: payload
  API-->>AdminUI: jobs page

  AdminUI->>API: POST /api/queue/flush { older_than_hours, statuses }
  API->>Q: flush_inactive_jobs(...)
  Q->>DB: delete matching jobs
  Q-->>API: counts
  API-->>AdminUI: result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Reprocessing WIP #96 — Parallel introduction of reprocessing/versioning with a simple queue, controller updates, database status/version methods, and queue router/UI.

Poem

A rabbit queues with quiet cheer,
Jobs hop along, the path is clear.
Transcripts grow, with versions new,
A dropdown shows which one to view.
The worker hums, the logs all gleam—
Flush, retry, and chase the dream. 🐇📜🗂️

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title accurately highlights the primary change—adding a queue management page and linking transcript reprocessing into the queue—so it reflects the main intent of the changeset even though smaller additions (e.g., delete-version and UI version components) aren’t listed.
Docstring Coverage ✅ Passed Docstring coverage is 85.37% which is sufficient. The required threshold is 80.00%.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backends/advanced/src/advanced_omi_backend/database.py (1)

20-23: Wrong Motor API: get_default_database does not take a name.

This will raise at runtime. Use get_database("friend-lite") or index access.

-mongo_client = AsyncIOMotorClient(MONGODB_URI)
-db = mongo_client.get_default_database("friend-lite")
+mongo_client = AsyncIOMotorClient(MONGODB_URI)
+db = mongo_client.get_database("friend-lite")
🧹 Nitpick comments (24)
backends/advanced/src/advanced_omi_backend/config.py (2)

40-45: Avoid duplicating default paths in constants

DEFAULT_AUDIO_STORAGE_SETTINGS hardcodes both base and chunks paths while get_audio_storage_settings derives chunks from base. Single-source the default (derive chunks from base) to prevent drift.


149-160: Normalize and join paths with pathlib

Safer across environments and avoids double slashes/trailing slash issues.

Apply this diff:

-def get_audio_storage_settings():
-    """Get audio storage settings from environment or defaults."""
-    
-    # Get base path and derive chunks path
-    audio_base_path = os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"])
-    audio_chunks_path = os.getenv("AUDIO_CHUNKS_PATH", f"{audio_base_path}/audio_chunks")
-    
-    return {
-        "audio_base_path": audio_base_path,
-        "audio_chunks_path": audio_chunks_path,
-    }
+def get_audio_storage_settings():
+    """Get audio storage settings from environment or defaults."""
+    base = Path(os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"])).resolve()
+    chunks_env = os.getenv("AUDIO_CHUNKS_PATH")
+    chunks = Path(chunks_env).resolve() if chunks_env else (base / "audio_chunks")
+    return {
+        "audio_base_path": str(base),
+        "audio_chunks_path": str(chunks),
+    }
backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py (1)

20-22: Sort all for stability and lint compliance

Ruff flags unsorted all. Sort alphabetically.

Apply this diff:

-__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router", "queue_router"]
+__all__ = [
+    "chat_router",
+    "client_router",
+    "conversation_router",
+    "memory_router",
+    "queue_router",
+    "system_router",
+    "user_router",
+]
backends/advanced/src/advanced_omi_backend/main.py (1)

326-344: Use centralized audio storage config

CHUNK_DIR is hardcoded; new config.get_audio_storage_settings() provides base/chunks. Recommend switching to it for consistency and deployability.

Example (outside this hunk):

from advanced_omi_backend.config import get_audio_storage_settings
paths = get_audio_storage_settings()
CHUNK_DIR = Path(paths["audio_chunks_path"])
CHUNK_DIR.mkdir(parents=True, exist_ok=True)

Also update app.mount("/audio", StaticFiles(directory=CHUNK_DIR), ...) accordingly.

backends/advanced/webui/src/pages/Conversations.tsx (2)

32-37: Type looks good; consider reuse

version_info shape is clear. Consider extracting a shared VersionInfo type/interface if used elsewhere.


528-606: Minor UX: large transcripts

For long transcripts, consider windowing/virtualization to keep DOM light (e.g., react-virtualized) when this becomes a hotspot.

backends/advanced/webui/src/pages/Queue.tsx (3)

130-133: Be explicit when building the request URL.

Template coercion of URLSearchParams is implicit; prefer .toString() for clarity.

-      const response = await fetch(`/api/queue/jobs?${params}`, {
+      const response = await fetch(`/api/queue/jobs?${params.toString()}`, {
         headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` }
       });

221-229: Avoid double-fetch on Apply/Clear.

Changing filters/offset already triggers the [filters, pagination.offset] effect. Calling fetchJobs() here causes an extra request.

-  const applyFilters = () => {
-    setPagination(prev => ({ ...prev, offset: 0 }));
-    fetchJobs();
-  };
+  const applyFilters = () => {
+    setPagination(prev => ({ ...prev, offset: 0 }));
+  };
@@
-  const clearFilters = () => {
-    setFilters({ status: '', job_type: '', priority: '' });
-    setPagination(prev => ({ ...prev, offset: 0 }));
-  };
+  const clearFilters = () => {
+    setFilters({ status: '', job_type: '', priority: '' });
+    setPagination(prev => ({ ...prev, offset: 0 }));
+  };

201-219: Prefer an in-app modal over confirm/alert for accessibility and consistency.

The rest of the page uses custom modals; align cancel confirmation with that.

backends/advanced/webui/src/components/ConversationVersionHeader.tsx (1)

21-35: Clear stale error before retry; surface success feedback.

Reset error before starting, so past errors don’t persist after a successful retrigger.

   const handleReprocessTranscript = async (event: React.MouseEvent) => {
     event.preventDefault();
     event.stopPropagation();

     try {
       setLoading(true);
+      setError(null);
       await conversationsApi.reprocessTranscript(conversationId);
       onVersionChange?.();
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (2)

52-55: Use logger.exception for tracebacks and preserve exception context when re-raising.

Improves observability and debuggability.

-    except Exception as e:
-        logger.error(f"Failed to list jobs: {e}")
+    except Exception as e:
+        logger.exception("Failed to list jobs")
         return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}}
@@
-    except Exception as e:
-        logger.error(f"Failed to get queue stats: {e}")
-        return {"queued": 0, "processing": 0, "completed": 0, "failed": 0}
+    except Exception:
+        logger.exception("Failed to get queue stats")
+        # Return consistent shape with UI expectations
+        return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, "cancelled_jobs": 0, "retrying_jobs": 0, "timestamp": None}
@@
-    except Exception as e:
-        logger.error(f"Failed to get queue health: {e}")
+    except Exception:
+        logger.exception("Failed to get queue health")
         return {
             "status": "unhealthy",
-            "message": f"Health check failed: {str(e)}"
+            "message": "Health check failed"
         }
@@
-    except Exception as e:
-        logger.error(f"Failed to flush inactive jobs: {e}")
-        raise HTTPException(status_code=500, detail=f"Failed to flush jobs: {str(e)}")
+    except Exception as e:
+        logger.exception("Failed to flush inactive jobs")
+        raise HTTPException(status_code=500, detail="Failed to flush jobs") from e
@@
-    except Exception as e:
-        logger.error(f"Failed to flush all jobs: {e}")
-        raise HTTPException(status_code=500, detail=f"Failed to flush all jobs: {str(e)}")
+    except Exception as e:
+        logger.exception("Failed to flush all jobs")
+        raise HTTPException(status_code=500, detail="Failed to flush all jobs") from e

Also applies to: 67-70, 84-89, 118-121, 146-147


57-66: current_user is intentionally unused for auth — mark or reference for clarity.

If you keep it only for dependency enforcement, add a comment to silence linters.

-async def get_queue_stats(
-    current_user: User = Depends(current_active_user)
-):
+async def get_queue_stats(
+    current_user: User = Depends(current_active_user)  # noqa: ARG001 - used for auth dependency
+):
backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (1)

109-111: Guard against missing provider/model in labels.

Avoid rendering “undefined”.

-  const formatVersionLabel = (version: TranscriptVersion | MemoryVersion, index: number) => {
-    return `v${index + 1} (${version.provider}${version.model ? ` ${version.model}` : ''})`
-  }
+  const formatVersionLabel = (version: TranscriptVersion | MemoryVersion, index: number) => {
+    const provider = (version as any).provider || 'unknown';
+    const model = (version as any).model ? ` ${(version as any).model}` : '';
+    return `v${index + 1} (${provider}${model})`;
+  }
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (4)

99-109: Gate expensive/PII‑leaning debug logs behind DEBUG

Fetching a sample doc and logging user_id types on every request is noisy and can expose PII. Guard this block with DEBUG.

-        # Debug: Check what's in the conversations collection
-        total_conversations = await conversations_col.count_documents({})
-        logger.info(f"📊 Total conversations in database: {total_conversations}")
-
-        if total_conversations > 0:
-            # Show a sample conversation to debug user_id format
-            sample = await conversations_col.find_one({})
-            if sample:
-                logger.info(f"🔍 Sample conversation user_id: '{sample.get('user_id')}' (type: {type(sample.get('user_id'))})")
-                logger.info(f"🔍 Looking for user_id: '{str(user.user_id)}' (type: {type(str(user.user_id))})")
+        if logger.isEnabledFor(logging.DEBUG):
+            total_conversations = await conversations_col.count_documents({})
+            logger.debug(f"📊 Total conversations in database: {total_conversations}")
+            if total_conversations > 0:
+                sample = await conversations_col.find_one({}, projection={"user_id": 1})
+                if sample:
+                    logger.debug(
+                        "🔍 Sample conversation user_id: %r (type: %s); looking for %r (type: %s)",
+                        sample.get("user_id"),
+                        type(sample.get("user_id")),
+                        str(user.user_id),
+                        type(str(user.user_id)),
+                    )

118-121: Avoid calling a repository “private” method from the controller

Consider exposing a public helper on ConversationsRepository instead of using _populate_primary_fields directly.


171-175: Reduce log verbosity for final grouping logs

Move to DEBUG to avoid noisy INFO logs at scale.

-        logger.info(f"✅ Returning {len(conversations)} client groups with {total_grouped} total conversations")
-        logger.info(f"📊 Client groups: {list(conversations.keys())}")
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug(
+                "✅ Returning %d client groups with %d total conversations", 
+                len(conversations), total_grouped
+            )
+            logger.debug("📊 Client groups: %s", list(conversations.keys()))

951-979: Audio metadata read: fine; consider catching wave.Error specifically

Optional: catch wave.Error for clarity; keep broad catch for safety if desired.

backends/advanced/src/advanced_omi_backend/simple_queue.py (7)

27-32: Create indexes for performance and correctness

Add indexes on job_id (unique), status, and created_at to speed up lookups and enforce id uniqueness.

Would you like a follow-up patch to start_worker() that ensures these indexes exist at startup?


91-93: Use logging.exception for worker loop errors

Capture stack traces for easier diagnostics.

-            except Exception as e:
-                logger.error(f"Worker loop error: {e}")
+            except Exception:
+                logger.exception("Worker loop error")
                 await asyncio.sleep(5)

122-124: Use logging.exception on job failures

Ensure failures carry stack traces; also consider unsetting started_at when retrying (see below).

-        except Exception as e:
-            logger.error(f"❌ Job {job_id} failed: {e}")
+        except Exception as e:
+            logger.exception("❌ Job %s failed", job_id)

141-149: When retrying, unset started_at

Avoid stale started_at after requeue.

-                await self.jobs_collection.update_one(
-                    {"job_id": job_id},
-                    {"$set": {
-                        "status": JobStatus.QUEUED,
-                        "attempts": attempts,
-                        "last_error": str(e)
-                    }}
-                )
+                await self.jobs_collection.update_one(
+                    {"job_id": job_id},
+                    {
+                        "$set": {
+                            "status": JobStatus.QUEUED,
+                            "attempts": attempts,
+                            "last_error": str(e),
+                        },
+                        "$unset": {"started_at": ""},
+                    },
+                )

215-226: Type hints: use Optional for nullable parameters

PEP 484: annotate None-able args as Optional.

-    async def get_jobs(self, limit: int = 20, offset: int = 0, filters: Dict[str, str] = None) -> Dict[str, Any]:
+    async def get_jobs(self, limit: int = 20, offset: int = 0, filters: Optional[Dict[str, str]] = None) -> Dict[str, Any]:

264-313: Normalize statuses to strings in flush_inactive_jobs

Ensure query and stats keys are consistent whether callers pass JobStatus or str.

-        if statuses is None:
-            statuses = [JobStatus.COMPLETED, JobStatus.FAILED]
+        if statuses is None:
+            statuses = [JobStatus.COMPLETED, JobStatus.FAILED]
+        normalized_statuses = [s.value if isinstance(s, JobStatus) else s for s in statuses]
@@
-        query = {
-            "status": {"$in": statuses},
+        query = {
+            "status": {"$in": normalized_statuses},
             "created_at": {"$lt": cutoff_time}
         }
@@
-        removal_stats = {status: 0 for status in statuses}
+        removal_stats = {status: 0 for status in normalized_statuses}

360-368: Type hints: Optional for nullable parameter

Same as above for flush helpers.

-async def flush_inactive_jobs(older_than_hours: int = 24, statuses: list = None) -> Dict[str, int]:
+async def flush_inactive_jobs(older_than_hours: int = 24, statuses: Optional[list] = None) -> Dict[str, int]:
@@
-async def flush_all_jobs(confirm: bool = False) -> Dict[str, int]:
+async def flush_all_jobs(confirm: bool = False) -> Dict[str, int]:
     """Convenience function to flush all jobs. USE WITH CAUTION!"""
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 641b7a6 and 51daaaf.

📒 Files selected for processing (15)
  • backends/advanced/src/advanced_omi_backend/config.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (8 hunks)
  • backends/advanced/src/advanced_omi_backend/database.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/main.py (3 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/api_router.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/simple_queue.py (1 hunks)
  • backends/advanced/webui/src/App.tsx (2 hunks)
  • backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (1 hunks)
  • backends/advanced/webui/src/components/ConversationVersionHeader.tsx (1 hunks)
  • backends/advanced/webui/src/components/layout/Layout.tsx (2 hunks)
  • backends/advanced/webui/src/pages/Conversations.tsx (5 hunks)
  • backends/advanced/webui/src/pages/Queue.tsx (1 hunks)
  • backends/advanced/webui/src/services/api.ts (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
📚 Learning: 2025-09-18T01:42:08.551Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management

Applied to files:

  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
  • backends/advanced/src/advanced_omi_backend/database.py
🪛 Ruff (0.13.1)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py

26-26: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


50-50: Consider moving this statement to an else block

(TRY300)


52-52: Do not catch blind exception: Exception

(BLE001)


53-53: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


59-59: Unused function argument: current_user

(ARG001)


59-59: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


65-65: Consider moving this statement to an else block

(TRY300)


67-67: Do not catch blind exception: Exception

(BLE001)


68-68: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


78-82: Consider moving this statement to an else block

(TRY300)


84-84: Do not catch blind exception: Exception

(BLE001)


85-85: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


88-88: Use explicit conversion flag

Replace with conversion flag

(RUF010)


104-104: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


116-116: Consider moving this statement to an else block

(TRY300)


118-118: Do not catch blind exception: Exception

(BLE001)


119-119: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


120-120: Use explicit conversion flag

Replace with conversion flag

(RUF010)


126-126: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


134-137: Abstract raise to an inner function

(TRY301)


141-141: Consider moving this statement to an else block

(TRY300)


144-144: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


145-145: Do not catch blind exception: Exception

(BLE001)


146-146: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


147-147: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


147-147: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py

108-108: Use explicit conversion flag

Replace with conversion flag

(RUF010)


176-176: Consider moving this statement to an else block

(TRY300)


697-697: Do not catch blind exception: Exception

(BLE001)


698-698: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


701-701: Use explicit conversion flag

Replace with conversion flag

(RUF010)


707-707: Unused function argument: audio_uuid

(ARG001)


745-745: Create your own exception

(TRY002)


745-745: Avoid specifying long messages outside the exception class

(TRY003)


759-759: f-string without any placeholders

Remove extraneous f prefix

(F541)


767-767: Create your own exception

(TRY002)


767-767: Avoid specifying long messages outside the exception class

(TRY003)


783-783: f-string without any placeholders

Remove extraneous f prefix

(F541)


835-835: Do not catch blind exception: Exception

(BLE001)


869-869: Create your own exception

(TRY002)


869-869: Avoid specifying long messages outside the exception class

(TRY003)


971-971: Do not catch blind exception: Exception

(BLE001)


976-976: Do not catch blind exception: Exception

(BLE001)


1011-1011: Do not catch blind exception: Exception

(BLE001)


1012-1012: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


1125-1125: Do not catch blind exception: Exception

(BLE001)


1126-1126: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backends/advanced/src/advanced_omi_backend/main.py

332-332: Do not catch blind exception: Exception

(BLE001)


333-333: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


361-361: Do not catch blind exception: Exception

(BLE001)


362-362: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py

22-22: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

backends/advanced/src/advanced_omi_backend/database.py

523-523: Redefinition of unused update_transcription_status from line 389

(F811)


524-524: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


524-524: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


745-745: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


746-746: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


747-747: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


748-748: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


749-749: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)

backends/advanced/src/advanced_omi_backend/simple_queue.py

91-91: Do not catch blind exception: Exception

(BLE001)


92-92: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


109-109: Abstract raise to an inner function

(TRY301)


109-109: Create your own exception

(TRY002)


109-109: Avoid specifying long messages outside the exception class

(TRY003)


122-122: Do not catch blind exception: Exception

(BLE001)


123-123: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


215-215: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


264-264: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


324-324: Avoid specifying long messages outside the exception class

(TRY003)


360-360: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)

🔇 Additional comments (18)
backends/advanced/webui/src/components/layout/Layout.tsx (2)

2-2: LGTM: icon import added correctly

Layers icon import is correct and unused imports aren’t introduced.


17-21: Admin-only nav: verify route-level guard exists

Adding “Queue Management” under the admin section is good. Ensure the /queue route itself is admin-gated (not just hidden in nav) to avoid unauthorized access via direct URL.

If not already enforced, add an admin guard on the route (see App.tsx comment).

backends/advanced/webui/src/App.tsx (1)

12-12: LGTM: Queue page route import

Import is consistent with new page wiring.

backends/advanced/src/advanced_omi_backend/routers/api_router.py (2)

17-17: LGTM: queue_router exported and imported

Matches modules export; keeps composition consistent.


35-35: LGTM: queue router included

Queue endpoints are now exposed under /api. Ensure the queue router itself enforces admin.

backends/advanced/src/advanced_omi_backend/main.py (1)

52-53: LGTM: simple_queue import

Import location and naming look good.

backends/advanced/webui/src/pages/Conversations.tsx (2)

4-4: LGTM: version header integration

Import of ConversationVersionHeader is correct and localized to conversations page.


355-366: Approve — version actions use conversation_id; verify delete endpoint

Reprocessing/version activation are keyed by conversation_id as expected. Confirm the backend conversation delete route expects audio_uuid (UI is passing audio_uuid); my rg search for router.delete(.*/api/conversations in backends/advanced/src/advanced_omi_backend/routers returned no matches, so manual verification required.

backends/advanced/webui/src/services/api.ts (1)

136-143: Align frontend API with backend + use axios params

  • Prefer axios params over string-concatenating URLSearchParams.
  • Verify backend supports job retry/cancel routes; AI summary only mentioned stats/health/flush.

Apply this diff:

 export const queueApi = {
-  getJobs: (params: URLSearchParams) => api.get(`/api/queue/jobs?${params}`),
+  getJobs: (params: URLSearchParams) => api.get('/api/queue/jobs', { params: Object.fromEntries(params as any) }),
   getStats: () => api.get('/api/queue/stats'),
   retryJob: (jobId: string, force: boolean = false) => 
     api.post(`/api/queue/jobs/${jobId}/retry`, { force }),
   cancelJob: (jobId: string) => api.delete(`/api/queue/jobs/${jobId}`),
 }

Optionally add parity endpoints if present server-side:

  • getHealth(): GET /api/queue/health
  • flush(queue?: string): POST /api/queue/flush
  • flushAll(): POST /api/queue/flush-all
backends/advanced/webui/src/components/ConversationVersionHeader.tsx (1)

37-69: LGTM on conditional rendering and UX.

Early return when versionInfo missing or single-version state is clean and matches the product flow.

backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (1)

78-83: LGTM on conditional lazy-loading of version history.

Only fetching when counts > 1 keeps requests lean.

backends/advanced/src/advanced_omi_backend/database.py (1)

656-666: Datetime storage is inconsistent (mix of ISO strings and datetime objects).

Mixing types across fields complicates queries and sorting. Pick one representation (prefer native UTC datetimes in Mongo) and standardize.

Also applies to: 767-768

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (5)

135-138: LGTM on version count extraction

Counts are derived safely with defaults.


179-179: Good: include stack trace on failures

exc_info=True improves troubleshooting.


204-206: Populate from active versions before response: good

Pre-populating primary fields aligns with the UI’s expectations.

Please confirm _populate_primary_fields always returns keys used in the formatter (e.g., transcript, memories, speakers_identified) to avoid KeyErrors.


511-516: Confirm cascade semantics when deleting conversation records

Deleting the conversation alongside the chunk is intentional here; ensure no orphaned processing_runs or external references remain, or wrap in a transaction if cross-collection integrity matters.


705-870: _do_transcript_reprocessing: WAV-only assumption, duration calc bug, and minor lint fixes

  • Assumes WAV; will fail for non‑WAV inputs. Either ensure stored files are WAV or decode to PCM before wave.open.
  • Duration fallback is incorrect; use frame count instead of byte length.
  • Fix unused param and f-strings; prefer specific exceptions.
-    logger.info(f"🎤 Starting core transcript reprocessing for conversation {conversation_id}")
+    logger.info("🎤 Starting core transcript reprocessing for conversation %s (audio_uuid=%s)", conversation_id, audio_uuid)
@@
-    if not transcription_manager.provider:
-        raise Exception("No transcription provider configured")
+    if not transcription_manager.provider:
+        raise RuntimeError("No transcription provider configured")
@@
-    with wave.open(audio_path, 'rb') as wav_file:
-        sample_rate = wav_file.getframerate()
-        audio_frames = wav_file.readframes(wav_file.getnframes())
+    with wave.open(audio_path, 'rb') as wav_file:
+        sample_rate = wav_file.getframerate()
+        nframes = wav_file.getnframes()
+        audio_frames = wav_file.readframes(nframes)
+    audio_duration_seconds = (nframes / float(sample_rate)) if sample_rate else 0.0
@@
-    logger.info(f"🎤 Processing transcript directly for reprocessing...")
+    logger.info("🎤 Processing transcript directly for reprocessing...")
@@
-    if not transcript_result:
-        raise Exception("Transcription failed - no result returned")
+    if not transcript_result:
+        raise RuntimeError("Transcription failed: empty result")
@@
-    if not segments and transcript_text:
+    if not segments and transcript_text:
         segments = [{
             "text": transcript_text,
             "start": 0.0,
-            "end": len(audio_frames) / (sample_rate * 2),  # Estimate duration
+            "end": audio_duration_seconds,
             "speaker": "Speaker 0",
             "confidence": transcript_result.get("confidence", 0.9)
         }]
@@
-    logger.info(f"🎤 Transcript reprocessing completed:")
+    logger.info("🎤 Transcript reprocessing completed:")
@@
-    else:
-        raise Exception("Failed to update transcript version in database")
+    else:
+        raise RuntimeError("Failed to update transcript version in database")

Verification asks:

  • Are audio chunks guaranteed to be WAV? If not, we should decode with ffmpeg/pydub or pass file path to the provider and let it handle formats.
backends/advanced/src/advanced_omi_backend/simple_queue.py (1)

151-187: LGTM: transcript handler integrates cleanly with controller

Result summarization is helpful for the UI.

Comment on lines +160 to +167
"debug_audio_url": f"/audio/{audio_path}" if audio_path else None,
# Version information for UI
"version_info": {
"transcript_count": len(transcript_versions),
"memory_count": len(memory_versions),
"active_transcript_version": conversation.get("active_transcript_version"),
"active_memory_version": conversation.get("active_memory_version"),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not expose debug_audio_url to regular users

This can leak internal paths/URLs. Restrict to admins/superusers.

-                    "debug_audio_url": f"/audio/{audio_path}" if audio_path else None,
+                    "debug_audio_url": f"/audio/{audio_path}" if (audio_path and user.is_superuser) else None,

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 160 to 167, the response currently includes "debug_audio_url" for
all users which can leak internal paths; change the code to only include
debug_audio_url when the current user is an admin/superuser by checking the
request/auth context (e.g., request.user.is_admin, current_user.is_superuser, or
the project's permission helper) and omit or set it to None for non-admins,
ensure the permission check happens before constructing the dict, and add/update
a small unit/integration test to verify non-admins do not receive the
debug_audio_url while admins do.

Comment on lines +574 to +703
async def delete_conversation_version(conversation_id: str, version_type: str, version_id: str, user: User):
"""Delete a specific version (transcript or memory) from a conversation. Users can only modify their own conversations."""
try:
conversations_repo = ConversationsRepository(conversations_col)

# Get the conversation first to check ownership
conversation = await conversations_repo.get_conversation(conversation_id)
if not conversation:
return JSONResponse(
status_code=404,
content={"error": "Conversation not found"}
)

# Check if user owns this conversation
if not user.is_superuser and conversation["user_id"] != str(user.user_id):
return JSONResponse(
status_code=403,
content={"error": "Access forbidden. You can only modify your own conversations."}
)

# Validate version type
if version_type not in ["transcript", "memory"]:
return JSONResponse(
status_code=400,
content={"error": "Version type must be 'transcript' or 'memory'"}
)

# Determine field names based on version type
if version_type == "transcript":
versions_field = "transcript_versions"
active_field = "active_transcript_version"
else: # memory
versions_field = "memory_versions"
active_field = "active_memory_version"

# Check if this version exists
versions = conversation.get(versions_field, [])
version_exists = any(v.get("version_id") == version_id for v in versions)

if not version_exists:
return JSONResponse(
status_code=404,
content={"error": f"{version_type.title()} version {version_id} not found"}
)

# Check if there are other versions (can't delete the last one)
if len(versions) <= 1:
return JSONResponse(
status_code=400,
content={"error": f"Cannot delete the last {version_type} version. Conversation must have at least one version."}
)

# Check if this is the active version
active_version = conversation.get(active_field)
is_active = (active_version == version_id)

# If deleting active version, we need to set a new active version
new_active_version = None
if is_active:
# Find the most recent non-deleted version to make active
remaining_versions = [v for v in versions if v.get("version_id") != version_id]
if remaining_versions:
# Sort by created_at and pick the most recent
remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
new_active_version = remaining_versions[0]["version_id"]

# Remove the version from the array
update_operations = {
"$pull": {versions_field: {"version_id": version_id}}
}

# If we need to update the active version
if new_active_version:
update_operations["$set"] = {active_field: new_active_version}

# Execute the update
result = await conversations_col.update_one(
{"conversation_id": conversation_id},
update_operations
)

if result.modified_count == 0:
return JSONResponse(
status_code=500,
content={"error": f"Failed to delete {version_type} version"}
)

# If we updated the active version, also update legacy fields
if new_active_version:
# Get the updated conversation and populate primary fields
updated_conversation = await conversations_repo.get_conversation(conversation_id)
updated_conversation = conversations_repo._populate_primary_fields(updated_conversation)

# Update legacy fields in database
legacy_updates = {}
if version_type == "transcript":
legacy_updates["transcript"] = updated_conversation.get("transcript", [])
legacy_updates["speakers_identified"] = updated_conversation.get("speakers_identified", [])
else: # memory
legacy_updates["memories"] = updated_conversation.get("memories", [])
legacy_updates["memory_processing_status"] = updated_conversation.get("memory_processing_status", "pending")

if legacy_updates:
await conversations_col.update_one(
{"conversation_id": conversation_id},
{"$set": legacy_updates}
)

logger.info(f"Deleted {version_type} version {version_id} from conversation {conversation_id}")

response_data = {
"message": f"Successfully deleted {version_type} version {version_id}",
"conversation_id": conversation_id,
"version_type": version_type,
"deleted_version_id": version_id,
"was_active": is_active
}

if new_active_version:
response_data["new_active_version"] = new_active_version

return JSONResponse(status_code=200, content=response_data)

except Exception as e:
logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to delete {version_type} version: {str(e)}"}
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

❓ Verification inconclusive

delete_conversation_version: created_at sorting and exception logging

  • Sorting by created_at using a string fallback can misorder items or error if types vary. Normalize to datetime.
  • Use logger.exception to capture trace.
  • Consider atomic guard to avoid a race that deletes the last version (server-side size check).
-                remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
-                new_active_version = remaining_versions[0]["version_id"]
+                from datetime import datetime
+                def _created_at_key(v):
+                    ts = v.get("created_at")
+                    if isinstance(ts, str):
+                        try:
+                            return datetime.fromisoformat(ts)
+                        except ValueError:
+                            return datetime.min
+                    return ts or datetime.min
+                remaining_versions.sort(key=_created_at_key, reverse=True)
+                new_active_version = remaining_versions[0]["version_id"]
-    except Exception as e:
-        logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}")
+    except Exception:
+        logger.exception(
+            "Error deleting %s version %s from conversation %s",
+            version_type, version_id, conversation_id,
+        )
         return JSONResponse(
             status_code=500,
             content={"error": f"Failed to delete {version_type} version: {str(e)}"}
         )

If feasible, execute the delete with a filter that asserts array size > 1 to prevent deleting the last version atomically (e.g., using $expr with $size). Happy to draft that if desired.


Fix created_at sorting, preserve exception var when logging, and make delete atomic

File: backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (lines 574-703)

  • Normalize created_at to datetime before sorting to avoid misordering; apply this diff:
-                remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
-                new_active_version = remaining_versions[0]["version_id"]
+                from datetime import datetime
+                def _created_at_key(v):
+                    ts = v.get("created_at")
+                    if isinstance(ts, str):
+                        try:
+                            return datetime.fromisoformat(ts)
+                        except ValueError:
+                            return datetime.min
+                    return ts or datetime.min
+                remaining_versions.sort(key=_created_at_key, reverse=True)
+                new_active_version = remaining_versions[0]["version_id"]
  • Use logger.exception(...) but keep "except Exception as e:" (do not drop the "as e") so the exception variable is available; avoid returning raw str(e) in the HTTP response (return a generic 500 message or redact details).

  • Make the delete atomic to avoid a race that could remove the last version: add a precondition in the update filter that asserts the versions array size > 1 (e.g. use "$expr": {"$gt":[{"$size": f"${versions_field}"}, 1]}) or perform the operation with find_one_and_update / an update pipeline so $pull and $set occur atomically.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def delete_conversation_version(conversation_id: str, version_type: str, version_id: str, user: User):
"""Delete a specific version (transcript or memory) from a conversation. Users can only modify their own conversations."""
try:
conversations_repo = ConversationsRepository(conversations_col)
# Get the conversation first to check ownership
conversation = await conversations_repo.get_conversation(conversation_id)
if not conversation:
return JSONResponse(
status_code=404,
content={"error": "Conversation not found"}
)
# Check if user owns this conversation
if not user.is_superuser and conversation["user_id"] != str(user.user_id):
return JSONResponse(
status_code=403,
content={"error": "Access forbidden. You can only modify your own conversations."}
)
# Validate version type
if version_type not in ["transcript", "memory"]:
return JSONResponse(
status_code=400,
content={"error": "Version type must be 'transcript' or 'memory'"}
)
# Determine field names based on version type
if version_type == "transcript":
versions_field = "transcript_versions"
active_field = "active_transcript_version"
else: # memory
versions_field = "memory_versions"
active_field = "active_memory_version"
# Check if this version exists
versions = conversation.get(versions_field, [])
version_exists = any(v.get("version_id") == version_id for v in versions)
if not version_exists:
return JSONResponse(
status_code=404,
content={"error": f"{version_type.title()} version {version_id} not found"}
)
# Check if there are other versions (can't delete the last one)
if len(versions) <= 1:
return JSONResponse(
status_code=400,
content={"error": f"Cannot delete the last {version_type} version. Conversation must have at least one version."}
)
# Check if this is the active version
active_version = conversation.get(active_field)
is_active = (active_version == version_id)
# If deleting active version, we need to set a new active version
new_active_version = None
if is_active:
# Find the most recent non-deleted version to make active
remaining_versions = [v for v in versions if v.get("version_id") != version_id]
if remaining_versions:
# Sort by created_at and pick the most recent
remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
new_active_version = remaining_versions[0]["version_id"]
# Remove the version from the array
update_operations = {
"$pull": {versions_field: {"version_id": version_id}}
}
# If we need to update the active version
if new_active_version:
update_operations["$set"] = {active_field: new_active_version}
# Execute the update
result = await conversations_col.update_one(
{"conversation_id": conversation_id},
update_operations
)
if result.modified_count == 0:
return JSONResponse(
status_code=500,
content={"error": f"Failed to delete {version_type} version"}
)
# If we updated the active version, also update legacy fields
if new_active_version:
# Get the updated conversation and populate primary fields
updated_conversation = await conversations_repo.get_conversation(conversation_id)
updated_conversation = conversations_repo._populate_primary_fields(updated_conversation)
# Update legacy fields in database
legacy_updates = {}
if version_type == "transcript":
legacy_updates["transcript"] = updated_conversation.get("transcript", [])
legacy_updates["speakers_identified"] = updated_conversation.get("speakers_identified", [])
else: # memory
legacy_updates["memories"] = updated_conversation.get("memories", [])
legacy_updates["memory_processing_status"] = updated_conversation.get("memory_processing_status", "pending")
if legacy_updates:
await conversations_col.update_one(
{"conversation_id": conversation_id},
{"$set": legacy_updates}
)
logger.info(f"Deleted {version_type} version {version_id} from conversation {conversation_id}")
response_data = {
"message": f"Successfully deleted {version_type} version {version_id}",
"conversation_id": conversation_id,
"version_type": version_type,
"deleted_version_id": version_id,
"was_active": is_active
}
if new_active_version:
response_data["new_active_version"] = new_active_version
return JSONResponse(status_code=200, content=response_data)
except Exception as e:
logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to delete {version_type} version: {str(e)}"}
)
# If deleting active version, we need to set a new active version
new_active_version = None
if is_active:
# Find the most recent non-deleted version to make active
remaining_versions = [v for v in versions if v.get("version_id") != version_id]
if remaining_versions:
# Normalize created_at to datetime before sorting to avoid misordering
from datetime import datetime
def _created_at_key(v):
ts = v.get("created_at")
if isinstance(ts, str):
try:
return datetime.fromisoformat(ts)
except ValueError:
return datetime.min
return ts or datetime.min
remaining_versions.sort(key=_created_at_key, reverse=True)
new_active_version = remaining_versions[0]["version_id"]
🧰 Tools
🪛 Ruff (0.13.1)

697-697: Do not catch blind exception: Exception

(BLE001)


698-698: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


701-701: Use explicit conversion flag

Replace with conversion flag

(RUF010)

Comment on lines +980 to +1019
# Queue the reprocessing job
try:
from advanced_omi_backend.simple_queue import get_simple_queue

queue = await get_simple_queue()
job_id = await queue.enqueue_job(
job_type="reprocess_transcript",
user_id=str(user.user_id),
data={
"conversation_id": conversation_id,
"audio_uuid": audio_uuid,
"audio_path": str(full_audio_path),
"run_id": run_id,
"version_id": version_id,
"audio_metadata": audio_stats
}
)

logger.info(f"📋 Queued transcript reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})")
logger.info(f"📋 Job data: audio_path={full_audio_path}, duration={audio_stats.get('duration_seconds', 0)}s")

return JSONResponse(content={
"message": f"Transcript reprocessing queued for conversation {conversation_id}",
"job_id": job_id,
"run_id": run_id,
"version_id": version_id,
"config_hash": config_hash,
"status": "QUEUED",
"audio_metadata": audio_stats
})

except Exception as queue_error:
logger.error(f"Failed to queue transcript reprocessing job: {queue_error}")
return JSONResponse(
status_code=500,
content={
"error": "Failed to queue transcript reprocessing job",
"details": str(queue_error)
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Queueing: include stack trace on enqueue failures; avoid hard‑coded provider in config hash

  • Use logger.exception on queue errors.
  • Replace hard-coded "deepgram" with the active provider name used by TranscriptionManager to make config_hash meaningful across providers.
-        config_data = {
-            "audio_path": str(full_audio_path),
-            "transcription_provider": "deepgram",  # This would come from settings
-            "trigger": "manual_reprocess"
-        }
+        # Derive provider for dedupe hash
+        try:
+            from advanced_omi_backend.transcription import TranscriptionManager
+            from advanced_omi_backend.processors import get_processor_manager
+            tm = TranscriptionManager(chunk_repo=chunk_repo, processor_manager=get_processor_manager())
+            provider_name = tm.provider.name if tm and tm.provider else "unknown"
+        except Exception:
+            provider_name = "unknown"
+        config_data = {
+            "audio_path": str(full_audio_path),
+            "transcription_provider": provider_name,
+            "trigger": "manual_reprocess",
+        }
@@
-        except Exception as queue_error:
-            logger.error(f"Failed to queue transcript reprocessing job: {queue_error}")
+        except Exception as queue_error:
+            logger.exception("Failed to queue transcript reprocessing job")
             return JSONResponse(
                 status_code=500, 
                 content={
                     "error": "Failed to queue transcript reprocessing job",
                     "details": str(queue_error)
                 }
             )

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.13.1)

1011-1011: Do not catch blind exception: Exception

(BLE001)


1012-1012: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 980 to 1019, update the queue error handling to call
logger.exception(...) instead of logger.error(...) so the full stack trace is
captured when enqueueing fails; and replace the hard-coded provider string
"deepgram" used when building config_hash with the actual active provider name
from the TranscriptionManager instance (e.g. read the manager or transcription
object that holds the provider name, use that value with a safe fallback like
"unknown_provider" if not available) so config_hash reflects the real
transcription provider across deployments.

Comment on lines +1096 to +1133
# Queue the memory processing job
try:
from advanced_omi_backend.simple_queue import get_simple_queue

queue = await get_simple_queue()
job_id = await queue.enqueue_job(
job_type="reprocess_memory",
user_id=str(user.user_id),
data={
"conversation_id": conversation_id,
"audio_uuid": audio_uuid,
"transcript_version_id": transcript_version_id,
"run_id": run_id,
"version_id": version_id
}
)

logger.info(f"Queued memory reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})")

return JSONResponse(content={
"message": f"Memory reprocessing queued for conversation {conversation_id}",
"job_id": job_id,
"run_id": run_id,
"version_id": version_id,
"transcript_version_id": transcript_version_id,
"config_hash": config_hash,
"status": "QUEUED"
})

except Exception as queue_error:
logger.error(f"Failed to queue memory reprocessing job: {queue_error}")
return JSONResponse(
status_code=500,
content={
"error": "Failed to queue memory reprocessing job",
"details": str(queue_error)
}
)
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Memory jobs are queued but the worker handler is a stub

As implemented, the queue will mark these jobs “completed” with no work done (see simple_queue). Either implement memory processing or temporarily block this endpoint (HTTP 501) until the worker supports it.

🧰 Tools
🪛 Ruff (0.13.1)

1125-1125: Do not catch blind exception: Exception

(BLE001)


1126-1126: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 1096 to 1133, the controller queues "reprocess_memory" jobs but the
worker currently treats that job type as a no-op, so queued jobs are immediately
marked completed; either implement the memory-processing worker logic or block
the endpoint until it exists. To fix: either (A) implement a proper handler for
job_type "reprocess_memory" in simple_queue's worker (consume job data, load
audio/transcript, run the memory reprocessing pipeline, persist updated
memories, and update job status/errors), or (B) change this endpoint to detect
that the worker lacks support and return HTTP 501 Not Implemented (with a clear
message) instead of queuing the job; pick one approach and update tests/logs
accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory reprocessing out of scope for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thestumonkey Understood! Thanks for clarifying that memory reprocessing implementation is out of scope for this PR.

Since the endpoint is currently accessible but the worker doesn't actually process memory jobs (it just marks them as "completed"), you might want to consider either:

  1. Adding a clear note in the API response indicating this feature is not yet implemented, or
  2. Temporarily returning HTTP 501 Not Implemented for memory reprocessing endpoints

This would help avoid user confusion when they queue memory reprocessing jobs that appear to complete successfully but don't actually do any work.

Would you like me to create a follow-up issue to track the actual memory reprocessing implementation?


✏️ Learnings added
Learnt from: thestumonkey
PR: AnkushMalaker/friend-lite#117
File: backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py:1096-1133
Timestamp: 2025-09-23T06:46:12.764Z
Learning: Memory reprocessing functionality in the queue system is intentionally left as a stub/placeholder and is out of scope for current development. The endpoint exists but the worker doesn't implement the actual processing logic yet.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management

Comment on lines +188 to +191
async def _handle_reprocess_memory(self, job: Dict[str, Any]):
"""Handle memory reprocessing by calling the controller method."""
# TODO: Implement when needed
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent false “COMPLETED” for unimplemented memory jobs

Raise NotImplementedError so the job is marked FAILED instead of COMPLETED with empty result.

-    async def _handle_reprocess_memory(self, job: Dict[str, Any]):
-        """Handle memory reprocessing by calling the controller method."""
-        # TODO: Implement when needed
-        pass
+    async def _handle_reprocess_memory(self, job: Dict[str, Any]):
+        """Handle memory reprocessing by calling the controller method."""
+        raise NotImplementedError("Memory reprocessing is not implemented yet")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _handle_reprocess_memory(self, job: Dict[str, Any]):
"""Handle memory reprocessing by calling the controller method."""
# TODO: Implement when needed
pass
async def _handle_reprocess_memory(self, job: Dict[str, Any]):
"""Handle memory reprocessing by calling the controller method."""
raise NotImplementedError("Memory reprocessing is not implemented yet")
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/simple_queue.py around lines 188
to 191, the _handle_reprocess_memory method currently contains a no-op pass
which causes unimplemented memory reprocess jobs to be marked COMPLETED with
empty results; replace the pass with raising a NotImplementedError (with a clear
message like "Memory reprocessing not implemented") so the exception propagates
and the job system marks the job as FAILED; keep or update the docstring to
reflect that the method intentionally raises until implemented.

Comment on lines +253 to +262
async def _get_next_job_id(self) -> int:
"""Get the next job ID using auto-increment."""
# Use MongoDB's findOneAndUpdate to atomically increment counter
counter_doc = await self.db["job_counters"].find_one_and_update(
{"_id": "job_id"},
{"$inc": {"sequence_value": 1}},
upsert=True,
return_document=True
)
return counter_doc["sequence_value"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: auto-increment counter can return None

Motor requires ReturnDocument.AFTER; with the current code first-time upsert can return None and crash. Also seed with $setOnInsert.

+from pymongo import ReturnDocument
@@
-        counter_doc = await self.db["job_counters"].find_one_and_update(
+        counter_doc = await self.db["job_counters"].find_one_and_update(
             {"_id": "job_id"},
-            {"$inc": {"sequence_value": 1}},
-            upsert=True,
-            return_document=True
+            {"$inc": {"sequence_value": 1}, "$setOnInsert": {"sequence_value": 0}},
+            upsert=True,
+            return_document=ReturnDocument.AFTER
         )
         return counter_doc["sequence_value"]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _get_next_job_id(self) -> int:
"""Get the next job ID using auto-increment."""
# Use MongoDB's findOneAndUpdate to atomically increment counter
counter_doc = await self.db["job_counters"].find_one_and_update(
{"_id": "job_id"},
{"$inc": {"sequence_value": 1}},
upsert=True,
return_document=True
)
return counter_doc["sequence_value"]
from pymongo import ReturnDocument
async def _get_next_job_id(self) -> int:
"""Get the next job ID using auto-increment."""
# Use MongoDB's findOneAndUpdate to atomically increment counter
counter_doc = await self.db["job_counters"].find_one_and_update(
{"_id": "job_id"},
{"$inc": {"sequence_value": 1}, "$setOnInsert": {"sequence_value": 0}},
upsert=True,
return_document=ReturnDocument.AFTER
)
return counter_doc["sequence_value"]
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/simple_queue.py around lines 253
to 262, the find_one_and_update call can return None on first upsert and crash;
change it to use ReturnDocument.AFTER (Motor's
return_document=ReturnDocument.AFTER) and include a $setOnInsert to seed
sequence_value (e.g. 0 or 1) so the document is created with an initial value,
then safely return the updated sequence_value after the atomic increment.

Comment on lines +72 to +76
<Route path="queue" element={
<PageErrorBoundary>
<Queue />
</PageErrorBoundary>
} />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Gate /queue behind admin

This route is only nav-gated; users could still hit /queue directly. Add an admin check at the route level.

Option A (if ProtectedRoute supports admin flag):

-                <Route path="queue" element={
-                  <PageErrorBoundary>
-                    <Queue />
-                  </PageErrorBoundary>
-                } />
+                <Route path="queue" element={
+                  <ProtectedRoute requireAdmin>
+                    <PageErrorBoundary>
+                      <Queue />
+                    </PageErrorBoundary>
+                  </ProtectedRoute>
+                } />

If not supported, I can add a small AdminRoute wrapper that reads isAdmin from AuthContext—say the word.


🏁 Script executed:

#!/bin/bash
# Check if ProtectedRoute already supports admin gating
rg -n --type=tsx 'function\s+ProtectedRoute|export default function ProtectedRoute' -C3
rg -n --type=tsx -C2 'requireAdmin|roles' src/components/auth/ProtectedRoute.tsx

Length of output: 55


🏁 Script executed:

#!/bin/bash
# Gather protection-related symbols and show App.tsx around the queue route
echo "=== App.tsx snippet (first 240 lines) ==="
if [ -f backends/advanced/webui/src/App.tsx ]; then
  nl -ba backends/advanced/webui/src/App.tsx | sed -n '1,240p'
else
  echo "App.tsx not found at backends/advanced/webui/src/App.tsx"
fi

echo
echo "=== Search for ProtectedRoute occurrences ==="
rg -n --hidden -S "ProtectedRoute" || true

echo
echo "=== Search for potential ProtectedRoute definitions ==="
rg -n --hidden -S "function\s+ProtectedRoute|export\s+default\s+function\s+ProtectedRoute|const\s+ProtectedRoute\s*=" || true

echo
echo "=== Search for admin gating keywords ==="
rg -n --hidden -S "requireAdmin|require_admin|isAdmin|is_admin|roles|admin" || true

echo
echo "=== Search for auth hooks/context ==="
rg -n --hidden -S "AuthContext|useAuth|useUser|getUser|currentUser" || true

Length of output: 33790


Gate /queue behind admin (wrap with ProtectedRoute adminOnly)

ProtectedRoute already supports adminOnly (backends/advanced/webui/src/components/auth/ProtectedRoute.tsx); wrap the /queue Route in backends/advanced/webui/src/App.tsx with so non-admins cannot reach /queue directly.

🤖 Prompt for AI Agents
In backends/advanced/webui/src/App.tsx around lines 72 to 76, the Route for
"/queue" is not protected for admin users; wrap the Route's element with the
existing ProtectedRoute component using the adminOnly prop so only admins can
access it. Update the route to render <ProtectedRoute adminOnly>{/* current
children (PageErrorBoundary + Queue) */}</ProtectedRoute>, ensure ProtectedRoute
is imported from backends/advanced/webui/src/components/auth/ProtectedRoute.tsx
if not already, and keep the inner PageErrorBoundary and Queue components
unchanged.

Comment on lines +78 to +93
// Auto-refresh interval
useEffect(() => {
console.log('🔄 Setting up queue auto-refresh interval');
const interval = setInterval(() => {
if (!loading) {
console.log('⏰ Auto-refreshing queue data');
fetchData();
}
}, 5000); // Refresh every 5 seconds

return () => {
console.log('🧹 Clearing queue auto-refresh interval');
clearInterval(interval);
};
}, []); // Remove dependencies to prevent interval recreation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Auto-refresh uses a stale closure; interval never fires (or fires with stale state).

The effect captures the initial loading and fetchData references. As written, if (!loading) will always evaluate using the initial value, so auto-refresh may never run. Make fetchData stable and depend on current flags, or read flags via refs.

Apply this diff to stabilize the callback and dependencies:

+import React, { useState, useEffect, useCallback } from 'react';
@@
-  useEffect(() => {
-    console.log('🔄 Setting up queue auto-refresh interval');
-    const interval = setInterval(() => {
-      if (!loading) {
-        console.log('⏰ Auto-refreshing queue data');
-        fetchData();
-      }
-    }, 5000); // Refresh every 5 seconds
-
-    return () => {
-      console.log('🧹 Clearing queue auto-refresh interval');
-      clearInterval(interval);
-    };
-  }, []); // Remove dependencies to prevent interval recreation
+  useEffect(() => {
+    const interval = setInterval(() => {
+      if (!loading && !refreshing) {
+        fetchData();
+      }
+    }, 5000);
+    return () => clearInterval(interval);
+  }, [loading, refreshing, fetchData]);

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Queue.tsx around lines 78 to 93 the
useEffect creating the auto-refresh interval captures stale references to
`loading` and `fetchData`, causing the interval callback to use initial values;
make `fetchData` stable with useCallback (including its own deps) and either
include `loading` (or a loading ref) in the effect's dependency list so the
interval closure sees current state, or read the latest loading value from a ref
inside the interval; update the cleanup to clearInterval as-is and ensure the
effect deps are correct to avoid recreating the interval unnecessarily.

Comment on lines +99 to +114
const fetchData = async () => {
console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading);
if (!refreshing) setRefreshing(true);

try {
console.log('🔄 Starting Promise.all for jobs and stats');
await Promise.all([fetchJobs(), fetchStats()]);
console.log('✅ Promise.all completed successfully');
} catch (error) {
console.error('❌ Error fetching queue data:', error);
} finally {
setLoading(false);
setRefreshing(false);
console.log('🏁 fetchData completed');
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent overlapping fetches; bail out early when already refreshing.

Without a guard, rapid triggers (interval + user actions) can overlap network calls.

Apply this diff:

-  const fetchData = async () => {
+  const fetchData = useCallback(async () => {
     console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading);
-    if (!refreshing) setRefreshing(true);
+    if (refreshing) return;
+    setRefreshing(true);
@@
-  };
+  }, [filters, pagination.offset, refreshing, loading]);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const fetchData = async () => {
console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading);
if (!refreshing) setRefreshing(true);
try {
console.log('🔄 Starting Promise.all for jobs and stats');
await Promise.all([fetchJobs(), fetchStats()]);
console.log('✅ Promise.all completed successfully');
} catch (error) {
console.error('❌ Error fetching queue data:', error);
} finally {
setLoading(false);
setRefreshing(false);
console.log('🏁 fetchData completed');
}
};
const fetchData = useCallback(async () => {
console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading);
if (refreshing) return;
setRefreshing(true);
try {
console.log('🔄 Starting Promise.all for jobs and stats');
await Promise.all([fetchJobs(), fetchStats()]);
console.log('✅ Promise.all completed successfully');
} catch (error) {
console.error('❌ Error fetching queue data:', error);
} finally {
setLoading(false);
setRefreshing(false);
console.log('🏁 fetchData completed');
}
}, [filters, pagination.offset, refreshing, loading]);
🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Queue.tsx around lines 99 to 114, add an
early-exit guard at the top of fetchData so it returns immediately when
refreshing is true to prevent overlapping network calls; setRefreshing(true)
only when proceeding (so the flag protects concurrent invocations), keep the
try/catch/finally that awaits Promise.all([fetchJobs(), fetchStats()]) and still
setLoading(false)/setRefreshing(false) in finally to reset state.

@AnkushMalaker
Copy link
Collaborator

@thestumonkey Just curious, the imports that are within the function:
Like:
from advanced_omi_backend.simple_queue import get_simple_queue
Is it good practice to do this import controller functions within the function in a router pattern? Or better to do it at top?
I ask because claude will often do this as a side effect of how it works.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@thestumonkey
Copy link
Contributor Author

@thestumonkey Just curious, the imports that are within the function: Like: from advanced_omi_backend.simple_queue import get_simple_queue Is it good practice to do this import controller functions within the function in a router pattern? Or better to do it at top? I ask because claude will often do this as a side effect of how it works.

I prefer to put it at the top of the file which is the pattern I've used for years. I guess it's useful for Claude to put in the function so you can see what it's using, and delete if you end up not using that code? I guess this method may end up with smaller binaries as you dont' end up with imports you don't need over time?

@AnkushMalaker
Copy link
Collaborator

Closing this, I assume this is outdated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants