Added queue management page and linked repreocess transcription#117
Added queue management page and linked repreocess transcription#117thestumonkey wants to merge 3 commits intoSimpleOpenSoftware:mainfrom thestumonkey:queue
Conversation
Added version selection from the conversation page, delete version and the queuemanagement page
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds environment-configurable audio storage defaults; introduces a MongoDB-backed simple queue with worker, routing, and UI for monitoring; shifts transcript/memory reprocessing to asynchronous jobs; extends conversation controller with version management and richer payloads; updates repositories with status/version update methods; wires queue startup/shutdown; adds admin navigation and Queue page. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant UI as Web UI
participant API as FastAPI /queue
participant CTRL as conversation_controller
participant Q as SimpleQueue
participant W as Queue Worker
participant DB as MongoDB
UI->>API: POST /api/conversations/{id}/reprocess-transcript
API->>CTRL: reprocess_transcript(...)
CTRL->>Q: enqueue_job(type=reprocess_transcript, data={conversation_id,...})
Q-->>CTRL: job_id
CTRL-->>UI: 202 QUEUED { job_id, metadata }
rect rgba(230,245,255,0.5)
note over W,Q: Background processing
W->>Q: fetch next QUEUED
Q-->>W: job
W->>CTRL: _do_transcript_reprocessing(job)
CTRL->>DB: update_transcription_status(... PROCESSING ...)
CTRL->>DB: load audio, run transcription, diarization
CTRL->>DB: update_transcript_version(..., status=COMPLETED)
CTRL-->>W: result payload
W->>DB: mark job COMPLETED with result
end
sequenceDiagram
autonumber
participant AdminUI as Admin UI
participant API as FastAPI /queue
participant Q as SimpleQueue
participant DB as MongoDB
AdminUI->>API: GET /api/queue/jobs?filters
API->>Q: get_jobs(limit, offset, filters)
Q->>DB: query simple_jobs
DB-->>Q: jobs + total
Q-->>API: payload
API-->>AdminUI: jobs page
AdminUI->>API: POST /api/queue/flush { older_than_hours, statuses }
API->>Q: flush_inactive_jobs(...)
Q->>DB: delete matching jobs
Q-->>API: counts
API-->>AdminUI: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 13
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backends/advanced/src/advanced_omi_backend/database.py (1)
20-23: Wrong Motor API:get_default_databasedoes not take a name.This will raise at runtime. Use
get_database("friend-lite")or index access.-mongo_client = AsyncIOMotorClient(MONGODB_URI) -db = mongo_client.get_default_database("friend-lite") +mongo_client = AsyncIOMotorClient(MONGODB_URI) +db = mongo_client.get_database("friend-lite")
🧹 Nitpick comments (24)
backends/advanced/src/advanced_omi_backend/config.py (2)
40-45: Avoid duplicating default paths in constantsDEFAULT_AUDIO_STORAGE_SETTINGS hardcodes both base and chunks paths while get_audio_storage_settings derives chunks from base. Single-source the default (derive chunks from base) to prevent drift.
149-160: Normalize and join paths with pathlibSafer across environments and avoids double slashes/trailing slash issues.
Apply this diff:
-def get_audio_storage_settings(): - """Get audio storage settings from environment or defaults.""" - - # Get base path and derive chunks path - audio_base_path = os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"]) - audio_chunks_path = os.getenv("AUDIO_CHUNKS_PATH", f"{audio_base_path}/audio_chunks") - - return { - "audio_base_path": audio_base_path, - "audio_chunks_path": audio_chunks_path, - } +def get_audio_storage_settings(): + """Get audio storage settings from environment or defaults.""" + base = Path(os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"])).resolve() + chunks_env = os.getenv("AUDIO_CHUNKS_PATH") + chunks = Path(chunks_env).resolve() if chunks_env else (base / "audio_chunks") + return { + "audio_base_path": str(base), + "audio_chunks_path": str(chunks), + }backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py (1)
20-22: Sort all for stability and lint complianceRuff flags unsorted all. Sort alphabetically.
Apply this diff:
-__all__ = ["user_router", "chat_router", "client_router", "conversation_router", "memory_router", "system_router", "queue_router"] +__all__ = [ + "chat_router", + "client_router", + "conversation_router", + "memory_router", + "queue_router", + "system_router", + "user_router", +]backends/advanced/src/advanced_omi_backend/main.py (1)
326-344: Use centralized audio storage configCHUNK_DIR is hardcoded; new config.get_audio_storage_settings() provides base/chunks. Recommend switching to it for consistency and deployability.
Example (outside this hunk):
from advanced_omi_backend.config import get_audio_storage_settings paths = get_audio_storage_settings() CHUNK_DIR = Path(paths["audio_chunks_path"]) CHUNK_DIR.mkdir(parents=True, exist_ok=True)Also update app.mount("/audio", StaticFiles(directory=CHUNK_DIR), ...) accordingly.
backends/advanced/webui/src/pages/Conversations.tsx (2)
32-37: Type looks good; consider reuseversion_info shape is clear. Consider extracting a shared VersionInfo type/interface if used elsewhere.
528-606: Minor UX: large transcriptsFor long transcripts, consider windowing/virtualization to keep DOM light (e.g., react-virtualized) when this becomes a hotspot.
backends/advanced/webui/src/pages/Queue.tsx (3)
130-133: Be explicit when building the request URL.Template coercion of URLSearchParams is implicit; prefer
.toString()for clarity.- const response = await fetch(`/api/queue/jobs?${params}`, { + const response = await fetch(`/api/queue/jobs?${params.toString()}`, { headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` } });
221-229: Avoid double-fetch on Apply/Clear.Changing filters/offset already triggers the
[filters, pagination.offset]effect. CallingfetchJobs()here causes an extra request.- const applyFilters = () => { - setPagination(prev => ({ ...prev, offset: 0 })); - fetchJobs(); - }; + const applyFilters = () => { + setPagination(prev => ({ ...prev, offset: 0 })); + }; @@ - const clearFilters = () => { - setFilters({ status: '', job_type: '', priority: '' }); - setPagination(prev => ({ ...prev, offset: 0 })); - }; + const clearFilters = () => { + setFilters({ status: '', job_type: '', priority: '' }); + setPagination(prev => ({ ...prev, offset: 0 })); + };
201-219: Prefer an in-app modal overconfirm/alertfor accessibility and consistency.The rest of the page uses custom modals; align cancel confirmation with that.
backends/advanced/webui/src/components/ConversationVersionHeader.tsx (1)
21-35: Clear stale error before retry; surface success feedback.Reset
errorbefore starting, so past errors don’t persist after a successful retrigger.const handleReprocessTranscript = async (event: React.MouseEvent) => { event.preventDefault(); event.stopPropagation(); try { setLoading(true); + setError(null); await conversationsApi.reprocessTranscript(conversationId); onVersionChange?.();backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (2)
52-55: Uselogger.exceptionfor tracebacks and preserve exception context when re-raising.Improves observability and debuggability.
- except Exception as e: - logger.error(f"Failed to list jobs: {e}") + except Exception as e: + logger.exception("Failed to list jobs") return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}} @@ - except Exception as e: - logger.error(f"Failed to get queue stats: {e}") - return {"queued": 0, "processing": 0, "completed": 0, "failed": 0} + except Exception: + logger.exception("Failed to get queue stats") + # Return consistent shape with UI expectations + return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, "cancelled_jobs": 0, "retrying_jobs": 0, "timestamp": None} @@ - except Exception as e: - logger.error(f"Failed to get queue health: {e}") + except Exception: + logger.exception("Failed to get queue health") return { "status": "unhealthy", - "message": f"Health check failed: {str(e)}" + "message": "Health check failed" } @@ - except Exception as e: - logger.error(f"Failed to flush inactive jobs: {e}") - raise HTTPException(status_code=500, detail=f"Failed to flush jobs: {str(e)}") + except Exception as e: + logger.exception("Failed to flush inactive jobs") + raise HTTPException(status_code=500, detail="Failed to flush jobs") from e @@ - except Exception as e: - logger.error(f"Failed to flush all jobs: {e}") - raise HTTPException(status_code=500, detail=f"Failed to flush all jobs: {str(e)}") + except Exception as e: + logger.exception("Failed to flush all jobs") + raise HTTPException(status_code=500, detail="Failed to flush all jobs") from eAlso applies to: 67-70, 84-89, 118-121, 146-147
57-66:current_useris intentionally unused for auth — mark or reference for clarity.If you keep it only for dependency enforcement, add a comment to silence linters.
-async def get_queue_stats( - current_user: User = Depends(current_active_user) -): +async def get_queue_stats( + current_user: User = Depends(current_active_user) # noqa: ARG001 - used for auth dependency +):backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (1)
109-111: Guard against missing provider/model in labels.Avoid rendering “undefined”.
- const formatVersionLabel = (version: TranscriptVersion | MemoryVersion, index: number) => { - return `v${index + 1} (${version.provider}${version.model ? ` ${version.model}` : ''})` - } + const formatVersionLabel = (version: TranscriptVersion | MemoryVersion, index: number) => { + const provider = (version as any).provider || 'unknown'; + const model = (version as any).model ? ` ${(version as any).model}` : ''; + return `v${index + 1} (${provider}${model})`; + }backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (4)
99-109: Gate expensive/PII‑leaning debug logs behind DEBUGFetching a sample doc and logging user_id types on every request is noisy and can expose PII. Guard this block with DEBUG.
- # Debug: Check what's in the conversations collection - total_conversations = await conversations_col.count_documents({}) - logger.info(f"📊 Total conversations in database: {total_conversations}") - - if total_conversations > 0: - # Show a sample conversation to debug user_id format - sample = await conversations_col.find_one({}) - if sample: - logger.info(f"🔍 Sample conversation user_id: '{sample.get('user_id')}' (type: {type(sample.get('user_id'))})") - logger.info(f"🔍 Looking for user_id: '{str(user.user_id)}' (type: {type(str(user.user_id))})") + if logger.isEnabledFor(logging.DEBUG): + total_conversations = await conversations_col.count_documents({}) + logger.debug(f"📊 Total conversations in database: {total_conversations}") + if total_conversations > 0: + sample = await conversations_col.find_one({}, projection={"user_id": 1}) + if sample: + logger.debug( + "🔍 Sample conversation user_id: %r (type: %s); looking for %r (type: %s)", + sample.get("user_id"), + type(sample.get("user_id")), + str(user.user_id), + type(str(user.user_id)), + )
118-121: Avoid calling a repository “private” method from the controllerConsider exposing a public helper on ConversationsRepository instead of using
_populate_primary_fieldsdirectly.
171-175: Reduce log verbosity for final grouping logsMove to DEBUG to avoid noisy INFO logs at scale.
- logger.info(f"✅ Returning {len(conversations)} client groups with {total_grouped} total conversations") - logger.info(f"📊 Client groups: {list(conversations.keys())}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "✅ Returning %d client groups with %d total conversations", + len(conversations), total_grouped + ) + logger.debug("📊 Client groups: %s", list(conversations.keys()))
951-979: Audio metadata read: fine; consider catchingwave.ErrorspecificallyOptional: catch
wave.Errorfor clarity; keep broad catch for safety if desired.backends/advanced/src/advanced_omi_backend/simple_queue.py (7)
27-32: Create indexes for performance and correctnessAdd indexes on job_id (unique), status, and created_at to speed up lookups and enforce id uniqueness.
Would you like a follow-up patch to
start_worker()that ensures these indexes exist at startup?
91-93: Use logging.exception for worker loop errorsCapture stack traces for easier diagnostics.
- except Exception as e: - logger.error(f"Worker loop error: {e}") + except Exception: + logger.exception("Worker loop error") await asyncio.sleep(5)
122-124: Use logging.exception on job failuresEnsure failures carry stack traces; also consider unsetting started_at when retrying (see below).
- except Exception as e: - logger.error(f"❌ Job {job_id} failed: {e}") + except Exception as e: + logger.exception("❌ Job %s failed", job_id)
141-149: When retrying, unset started_atAvoid stale started_at after requeue.
- await self.jobs_collection.update_one( - {"job_id": job_id}, - {"$set": { - "status": JobStatus.QUEUED, - "attempts": attempts, - "last_error": str(e) - }} - ) + await self.jobs_collection.update_one( + {"job_id": job_id}, + { + "$set": { + "status": JobStatus.QUEUED, + "attempts": attempts, + "last_error": str(e), + }, + "$unset": {"started_at": ""}, + }, + )
215-226: Type hints: use Optional for nullable parametersPEP 484: annotate None-able args as Optional.
- async def get_jobs(self, limit: int = 20, offset: int = 0, filters: Dict[str, str] = None) -> Dict[str, Any]: + async def get_jobs(self, limit: int = 20, offset: int = 0, filters: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
264-313: Normalize statuses to strings in flush_inactive_jobsEnsure query and stats keys are consistent whether callers pass JobStatus or str.
- if statuses is None: - statuses = [JobStatus.COMPLETED, JobStatus.FAILED] + if statuses is None: + statuses = [JobStatus.COMPLETED, JobStatus.FAILED] + normalized_statuses = [s.value if isinstance(s, JobStatus) else s for s in statuses] @@ - query = { - "status": {"$in": statuses}, + query = { + "status": {"$in": normalized_statuses}, "created_at": {"$lt": cutoff_time} } @@ - removal_stats = {status: 0 for status in statuses} + removal_stats = {status: 0 for status in normalized_statuses}
360-368: Type hints: Optional for nullable parameterSame as above for flush helpers.
-async def flush_inactive_jobs(older_than_hours: int = 24, statuses: list = None) -> Dict[str, int]: +async def flush_inactive_jobs(older_than_hours: int = 24, statuses: Optional[list] = None) -> Dict[str, int]: @@ -async def flush_all_jobs(confirm: bool = False) -> Dict[str, int]: +async def flush_all_jobs(confirm: bool = False) -> Dict[str, int]: """Convenience function to flush all jobs. USE WITH CAUTION!"""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
backends/advanced/src/advanced_omi_backend/config.py(2 hunks)backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py(8 hunks)backends/advanced/src/advanced_omi_backend/database.py(2 hunks)backends/advanced/src/advanced_omi_backend/main.py(3 hunks)backends/advanced/src/advanced_omi_backend/routers/api_router.py(2 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py(2 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py(1 hunks)backends/advanced/src/advanced_omi_backend/simple_queue.py(1 hunks)backends/advanced/webui/src/App.tsx(2 hunks)backends/advanced/webui/src/components/ConversationVersionDropdown.tsx(1 hunks)backends/advanced/webui/src/components/ConversationVersionHeader.tsx(1 hunks)backends/advanced/webui/src/components/layout/Layout.tsx(2 hunks)backends/advanced/webui/src/pages/Conversations.tsx(5 hunks)backends/advanced/webui/src/pages/Queue.tsx(1 hunks)backends/advanced/webui/src/services/api.ts(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
📚 Learning: 2025-09-18T01:42:08.551Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
Applied to files:
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.pybackends/advanced/src/advanced_omi_backend/database.py
🪛 Ruff (0.13.1)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
26-26: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
50-50: Consider moving this statement to an else block
(TRY300)
52-52: Do not catch blind exception: Exception
(BLE001)
53-53: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
59-59: Unused function argument: current_user
(ARG001)
59-59: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
65-65: Consider moving this statement to an else block
(TRY300)
67-67: Do not catch blind exception: Exception
(BLE001)
68-68: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
78-82: Consider moving this statement to an else block
(TRY300)
84-84: Do not catch blind exception: Exception
(BLE001)
85-85: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
88-88: Use explicit conversion flag
Replace with conversion flag
(RUF010)
104-104: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
116-116: Consider moving this statement to an else block
(TRY300)
118-118: Do not catch blind exception: Exception
(BLE001)
119-119: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
120-120: Use explicit conversion flag
Replace with conversion flag
(RUF010)
126-126: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
134-137: Abstract raise to an inner function
(TRY301)
141-141: Consider moving this statement to an else block
(TRY300)
144-144: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
145-145: Do not catch blind exception: Exception
(BLE001)
146-146: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
147-147: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
147-147: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
108-108: Use explicit conversion flag
Replace with conversion flag
(RUF010)
176-176: Consider moving this statement to an else block
(TRY300)
697-697: Do not catch blind exception: Exception
(BLE001)
698-698: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
701-701: Use explicit conversion flag
Replace with conversion flag
(RUF010)
707-707: Unused function argument: audio_uuid
(ARG001)
745-745: Create your own exception
(TRY002)
745-745: Avoid specifying long messages outside the exception class
(TRY003)
759-759: f-string without any placeholders
Remove extraneous f prefix
(F541)
767-767: Create your own exception
(TRY002)
767-767: Avoid specifying long messages outside the exception class
(TRY003)
783-783: f-string without any placeholders
Remove extraneous f prefix
(F541)
835-835: Do not catch blind exception: Exception
(BLE001)
869-869: Create your own exception
(TRY002)
869-869: Avoid specifying long messages outside the exception class
(TRY003)
971-971: Do not catch blind exception: Exception
(BLE001)
976-976: Do not catch blind exception: Exception
(BLE001)
1011-1011: Do not catch blind exception: Exception
(BLE001)
1012-1012: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1125-1125: Do not catch blind exception: Exception
(BLE001)
1126-1126: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/main.py
332-332: Do not catch blind exception: Exception
(BLE001)
333-333: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
361-361: Do not catch blind exception: Exception
(BLE001)
362-362: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py
22-22: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
backends/advanced/src/advanced_omi_backend/database.py
523-523: Redefinition of unused update_transcription_status from line 389
(F811)
524-524: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
524-524: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
745-745: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
746-746: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
747-747: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
748-748: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
749-749: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
backends/advanced/src/advanced_omi_backend/simple_queue.py
91-91: Do not catch blind exception: Exception
(BLE001)
92-92: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
109-109: Abstract raise to an inner function
(TRY301)
109-109: Create your own exception
(TRY002)
109-109: Avoid specifying long messages outside the exception class
(TRY003)
122-122: Do not catch blind exception: Exception
(BLE001)
123-123: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
215-215: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
264-264: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
324-324: Avoid specifying long messages outside the exception class
(TRY003)
360-360: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
🔇 Additional comments (18)
backends/advanced/webui/src/components/layout/Layout.tsx (2)
2-2: LGTM: icon import added correctlyLayers icon import is correct and unused imports aren’t introduced.
17-21: Admin-only nav: verify route-level guard existsAdding “Queue Management” under the admin section is good. Ensure the /queue route itself is admin-gated (not just hidden in nav) to avoid unauthorized access via direct URL.
If not already enforced, add an admin guard on the route (see App.tsx comment).
backends/advanced/webui/src/App.tsx (1)
12-12: LGTM: Queue page route importImport is consistent with new page wiring.
backends/advanced/src/advanced_omi_backend/routers/api_router.py (2)
17-17: LGTM: queue_router exported and importedMatches modules export; keeps composition consistent.
35-35: LGTM: queue router includedQueue endpoints are now exposed under /api. Ensure the queue router itself enforces admin.
backends/advanced/src/advanced_omi_backend/main.py (1)
52-53: LGTM: simple_queue importImport location and naming look good.
backends/advanced/webui/src/pages/Conversations.tsx (2)
4-4: LGTM: version header integrationImport of ConversationVersionHeader is correct and localized to conversations page.
355-366: Approve — version actions use conversation_id; verify delete endpointReprocessing/version activation are keyed by conversation_id as expected. Confirm the backend conversation delete route expects audio_uuid (UI is passing audio_uuid); my rg search for
router.delete(.*/api/conversationsin backends/advanced/src/advanced_omi_backend/routers returned no matches, so manual verification required.backends/advanced/webui/src/services/api.ts (1)
136-143: Align frontend API with backend + use axios params
- Prefer axios params over string-concatenating URLSearchParams.
- Verify backend supports job retry/cancel routes; AI summary only mentioned stats/health/flush.
Apply this diff:
export const queueApi = { - getJobs: (params: URLSearchParams) => api.get(`/api/queue/jobs?${params}`), + getJobs: (params: URLSearchParams) => api.get('/api/queue/jobs', { params: Object.fromEntries(params as any) }), getStats: () => api.get('/api/queue/stats'), retryJob: (jobId: string, force: boolean = false) => api.post(`/api/queue/jobs/${jobId}/retry`, { force }), cancelJob: (jobId: string) => api.delete(`/api/queue/jobs/${jobId}`), }Optionally add parity endpoints if present server-side:
- getHealth(): GET /api/queue/health
- flush(queue?: string): POST /api/queue/flush
- flushAll(): POST /api/queue/flush-all
backends/advanced/webui/src/components/ConversationVersionHeader.tsx (1)
37-69: LGTM on conditional rendering and UX.Early return when
versionInfomissing or single-version state is clean and matches the product flow.backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (1)
78-83: LGTM on conditional lazy-loading of version history.Only fetching when counts > 1 keeps requests lean.
backends/advanced/src/advanced_omi_backend/database.py (1)
656-666: Datetime storage is inconsistent (mix of ISO strings and datetime objects).Mixing types across fields complicates queries and sorting. Pick one representation (prefer native UTC datetimes in Mongo) and standardize.
Also applies to: 767-768
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (5)
135-138: LGTM on version count extractionCounts are derived safely with defaults.
179-179: Good: include stack trace on failures
exc_info=Trueimproves troubleshooting.
204-206: Populate from active versions before response: goodPre-populating primary fields aligns with the UI’s expectations.
Please confirm
_populate_primary_fieldsalways returns keys used in the formatter (e.g., transcript, memories, speakers_identified) to avoid KeyErrors.
511-516: Confirm cascade semantics when deleting conversation recordsDeleting the conversation alongside the chunk is intentional here; ensure no orphaned processing_runs or external references remain, or wrap in a transaction if cross-collection integrity matters.
705-870: _do_transcript_reprocessing: WAV-only assumption, duration calc bug, and minor lint fixes
- Assumes WAV; will fail for non‑WAV inputs. Either ensure stored files are WAV or decode to PCM before
wave.open.- Duration fallback is incorrect; use frame count instead of byte length.
- Fix unused param and f-strings; prefer specific exceptions.
- logger.info(f"🎤 Starting core transcript reprocessing for conversation {conversation_id}") + logger.info("🎤 Starting core transcript reprocessing for conversation %s (audio_uuid=%s)", conversation_id, audio_uuid) @@ - if not transcription_manager.provider: - raise Exception("No transcription provider configured") + if not transcription_manager.provider: + raise RuntimeError("No transcription provider configured") @@ - with wave.open(audio_path, 'rb') as wav_file: - sample_rate = wav_file.getframerate() - audio_frames = wav_file.readframes(wav_file.getnframes()) + with wave.open(audio_path, 'rb') as wav_file: + sample_rate = wav_file.getframerate() + nframes = wav_file.getnframes() + audio_frames = wav_file.readframes(nframes) + audio_duration_seconds = (nframes / float(sample_rate)) if sample_rate else 0.0 @@ - logger.info(f"🎤 Processing transcript directly for reprocessing...") + logger.info("🎤 Processing transcript directly for reprocessing...") @@ - if not transcript_result: - raise Exception("Transcription failed - no result returned") + if not transcript_result: + raise RuntimeError("Transcription failed: empty result") @@ - if not segments and transcript_text: + if not segments and transcript_text: segments = [{ "text": transcript_text, "start": 0.0, - "end": len(audio_frames) / (sample_rate * 2), # Estimate duration + "end": audio_duration_seconds, "speaker": "Speaker 0", "confidence": transcript_result.get("confidence", 0.9) }] @@ - logger.info(f"🎤 Transcript reprocessing completed:") + logger.info("🎤 Transcript reprocessing completed:") @@ - else: - raise Exception("Failed to update transcript version in database") + else: + raise RuntimeError("Failed to update transcript version in database")Verification asks:
- Are audio chunks guaranteed to be WAV? If not, we should decode with ffmpeg/pydub or pass file path to the provider and let it handle formats.
backends/advanced/src/advanced_omi_backend/simple_queue.py (1)
151-187: LGTM: transcript handler integrates cleanly with controllerResult summarization is helpful for the UI.
| "debug_audio_url": f"/audio/{audio_path}" if audio_path else None, | ||
| # Version information for UI | ||
| "version_info": { | ||
| "transcript_count": len(transcript_versions), | ||
| "memory_count": len(memory_versions), | ||
| "active_transcript_version": conversation.get("active_transcript_version"), | ||
| "active_memory_version": conversation.get("active_memory_version"), | ||
| } |
There was a problem hiding this comment.
Do not expose debug_audio_url to regular users
This can leak internal paths/URLs. Restrict to admins/superusers.
- "debug_audio_url": f"/audio/{audio_path}" if audio_path else None,
+ "debug_audio_url": f"/audio/{audio_path}" if (audio_path and user.is_superuser) else None,Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 160 to 167, the response currently includes "debug_audio_url" for
all users which can leak internal paths; change the code to only include
debug_audio_url when the current user is an admin/superuser by checking the
request/auth context (e.g., request.user.is_admin, current_user.is_superuser, or
the project's permission helper) and omit or set it to None for non-admins,
ensure the permission check happens before constructing the dict, and add/update
a small unit/integration test to verify non-admins do not receive the
debug_audio_url while admins do.
| async def delete_conversation_version(conversation_id: str, version_type: str, version_id: str, user: User): | ||
| """Delete a specific version (transcript or memory) from a conversation. Users can only modify their own conversations.""" | ||
| try: | ||
| conversations_repo = ConversationsRepository(conversations_col) | ||
|
|
||
| # Get the conversation first to check ownership | ||
| conversation = await conversations_repo.get_conversation(conversation_id) | ||
| if not conversation: | ||
| return JSONResponse( | ||
| status_code=404, | ||
| content={"error": "Conversation not found"} | ||
| ) | ||
|
|
||
| # Check if user owns this conversation | ||
| if not user.is_superuser and conversation["user_id"] != str(user.user_id): | ||
| return JSONResponse( | ||
| status_code=403, | ||
| content={"error": "Access forbidden. You can only modify your own conversations."} | ||
| ) | ||
|
|
||
| # Validate version type | ||
| if version_type not in ["transcript", "memory"]: | ||
| return JSONResponse( | ||
| status_code=400, | ||
| content={"error": "Version type must be 'transcript' or 'memory'"} | ||
| ) | ||
|
|
||
| # Determine field names based on version type | ||
| if version_type == "transcript": | ||
| versions_field = "transcript_versions" | ||
| active_field = "active_transcript_version" | ||
| else: # memory | ||
| versions_field = "memory_versions" | ||
| active_field = "active_memory_version" | ||
|
|
||
| # Check if this version exists | ||
| versions = conversation.get(versions_field, []) | ||
| version_exists = any(v.get("version_id") == version_id for v in versions) | ||
|
|
||
| if not version_exists: | ||
| return JSONResponse( | ||
| status_code=404, | ||
| content={"error": f"{version_type.title()} version {version_id} not found"} | ||
| ) | ||
|
|
||
| # Check if there are other versions (can't delete the last one) | ||
| if len(versions) <= 1: | ||
| return JSONResponse( | ||
| status_code=400, | ||
| content={"error": f"Cannot delete the last {version_type} version. Conversation must have at least one version."} | ||
| ) | ||
|
|
||
| # Check if this is the active version | ||
| active_version = conversation.get(active_field) | ||
| is_active = (active_version == version_id) | ||
|
|
||
| # If deleting active version, we need to set a new active version | ||
| new_active_version = None | ||
| if is_active: | ||
| # Find the most recent non-deleted version to make active | ||
| remaining_versions = [v for v in versions if v.get("version_id") != version_id] | ||
| if remaining_versions: | ||
| # Sort by created_at and pick the most recent | ||
| remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True) | ||
| new_active_version = remaining_versions[0]["version_id"] | ||
|
|
||
| # Remove the version from the array | ||
| update_operations = { | ||
| "$pull": {versions_field: {"version_id": version_id}} | ||
| } | ||
|
|
||
| # If we need to update the active version | ||
| if new_active_version: | ||
| update_operations["$set"] = {active_field: new_active_version} | ||
|
|
||
| # Execute the update | ||
| result = await conversations_col.update_one( | ||
| {"conversation_id": conversation_id}, | ||
| update_operations | ||
| ) | ||
|
|
||
| if result.modified_count == 0: | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Failed to delete {version_type} version"} | ||
| ) | ||
|
|
||
| # If we updated the active version, also update legacy fields | ||
| if new_active_version: | ||
| # Get the updated conversation and populate primary fields | ||
| updated_conversation = await conversations_repo.get_conversation(conversation_id) | ||
| updated_conversation = conversations_repo._populate_primary_fields(updated_conversation) | ||
|
|
||
| # Update legacy fields in database | ||
| legacy_updates = {} | ||
| if version_type == "transcript": | ||
| legacy_updates["transcript"] = updated_conversation.get("transcript", []) | ||
| legacy_updates["speakers_identified"] = updated_conversation.get("speakers_identified", []) | ||
| else: # memory | ||
| legacy_updates["memories"] = updated_conversation.get("memories", []) | ||
| legacy_updates["memory_processing_status"] = updated_conversation.get("memory_processing_status", "pending") | ||
|
|
||
| if legacy_updates: | ||
| await conversations_col.update_one( | ||
| {"conversation_id": conversation_id}, | ||
| {"$set": legacy_updates} | ||
| ) | ||
|
|
||
| logger.info(f"Deleted {version_type} version {version_id} from conversation {conversation_id}") | ||
|
|
||
| response_data = { | ||
| "message": f"Successfully deleted {version_type} version {version_id}", | ||
| "conversation_id": conversation_id, | ||
| "version_type": version_type, | ||
| "deleted_version_id": version_id, | ||
| "was_active": is_active | ||
| } | ||
|
|
||
| if new_active_version: | ||
| response_data["new_active_version"] = new_active_version | ||
|
|
||
| return JSONResponse(status_code=200, content=response_data) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}") | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={"error": f"Failed to delete {version_type} version: {str(e)}"} | ||
| ) | ||
|
|
There was a problem hiding this comment.
❓ Verification inconclusive
delete_conversation_version: created_at sorting and exception logging
- Sorting by
created_atusing a string fallback can misorder items or error if types vary. Normalize to datetime. - Use
logger.exceptionto capture trace. - Consider atomic guard to avoid a race that deletes the last version (server-side size check).
- remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
- new_active_version = remaining_versions[0]["version_id"]
+ from datetime import datetime
+ def _created_at_key(v):
+ ts = v.get("created_at")
+ if isinstance(ts, str):
+ try:
+ return datetime.fromisoformat(ts)
+ except ValueError:
+ return datetime.min
+ return ts or datetime.min
+ remaining_versions.sort(key=_created_at_key, reverse=True)
+ new_active_version = remaining_versions[0]["version_id"]- except Exception as e:
- logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}")
+ except Exception:
+ logger.exception(
+ "Error deleting %s version %s from conversation %s",
+ version_type, version_id, conversation_id,
+ )
return JSONResponse(
status_code=500,
content={"error": f"Failed to delete {version_type} version: {str(e)}"}
)If feasible, execute the delete with a filter that asserts array size > 1 to prevent deleting the last version atomically (e.g., using $expr with $size). Happy to draft that if desired.
Fix created_at sorting, preserve exception var when logging, and make delete atomic
File: backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (lines 574-703)
- Normalize created_at to datetime before sorting to avoid misordering; apply this diff:
- remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
- new_active_version = remaining_versions[0]["version_id"]
+ from datetime import datetime
+ def _created_at_key(v):
+ ts = v.get("created_at")
+ if isinstance(ts, str):
+ try:
+ return datetime.fromisoformat(ts)
+ except ValueError:
+ return datetime.min
+ return ts or datetime.min
+ remaining_versions.sort(key=_created_at_key, reverse=True)
+ new_active_version = remaining_versions[0]["version_id"]-
Use logger.exception(...) but keep "except Exception as e:" (do not drop the "as e") so the exception variable is available; avoid returning raw str(e) in the HTTP response (return a generic 500 message or redact details).
-
Make the delete atomic to avoid a race that could remove the last version: add a precondition in the update filter that asserts the versions array size > 1 (e.g. use "$expr": {"$gt":[{"$size": f"${versions_field}"}, 1]}) or perform the operation with find_one_and_update / an update pipeline so $pull and $set occur atomically.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def delete_conversation_version(conversation_id: str, version_type: str, version_id: str, user: User): | |
| """Delete a specific version (transcript or memory) from a conversation. Users can only modify their own conversations.""" | |
| try: | |
| conversations_repo = ConversationsRepository(conversations_col) | |
| # Get the conversation first to check ownership | |
| conversation = await conversations_repo.get_conversation(conversation_id) | |
| if not conversation: | |
| return JSONResponse( | |
| status_code=404, | |
| content={"error": "Conversation not found"} | |
| ) | |
| # Check if user owns this conversation | |
| if not user.is_superuser and conversation["user_id"] != str(user.user_id): | |
| return JSONResponse( | |
| status_code=403, | |
| content={"error": "Access forbidden. You can only modify your own conversations."} | |
| ) | |
| # Validate version type | |
| if version_type not in ["transcript", "memory"]: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Version type must be 'transcript' or 'memory'"} | |
| ) | |
| # Determine field names based on version type | |
| if version_type == "transcript": | |
| versions_field = "transcript_versions" | |
| active_field = "active_transcript_version" | |
| else: # memory | |
| versions_field = "memory_versions" | |
| active_field = "active_memory_version" | |
| # Check if this version exists | |
| versions = conversation.get(versions_field, []) | |
| version_exists = any(v.get("version_id") == version_id for v in versions) | |
| if not version_exists: | |
| return JSONResponse( | |
| status_code=404, | |
| content={"error": f"{version_type.title()} version {version_id} not found"} | |
| ) | |
| # Check if there are other versions (can't delete the last one) | |
| if len(versions) <= 1: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": f"Cannot delete the last {version_type} version. Conversation must have at least one version."} | |
| ) | |
| # Check if this is the active version | |
| active_version = conversation.get(active_field) | |
| is_active = (active_version == version_id) | |
| # If deleting active version, we need to set a new active version | |
| new_active_version = None | |
| if is_active: | |
| # Find the most recent non-deleted version to make active | |
| remaining_versions = [v for v in versions if v.get("version_id") != version_id] | |
| if remaining_versions: | |
| # Sort by created_at and pick the most recent | |
| remaining_versions.sort(key=lambda x: x.get("created_at", ""), reverse=True) | |
| new_active_version = remaining_versions[0]["version_id"] | |
| # Remove the version from the array | |
| update_operations = { | |
| "$pull": {versions_field: {"version_id": version_id}} | |
| } | |
| # If we need to update the active version | |
| if new_active_version: | |
| update_operations["$set"] = {active_field: new_active_version} | |
| # Execute the update | |
| result = await conversations_col.update_one( | |
| {"conversation_id": conversation_id}, | |
| update_operations | |
| ) | |
| if result.modified_count == 0: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to delete {version_type} version"} | |
| ) | |
| # If we updated the active version, also update legacy fields | |
| if new_active_version: | |
| # Get the updated conversation and populate primary fields | |
| updated_conversation = await conversations_repo.get_conversation(conversation_id) | |
| updated_conversation = conversations_repo._populate_primary_fields(updated_conversation) | |
| # Update legacy fields in database | |
| legacy_updates = {} | |
| if version_type == "transcript": | |
| legacy_updates["transcript"] = updated_conversation.get("transcript", []) | |
| legacy_updates["speakers_identified"] = updated_conversation.get("speakers_identified", []) | |
| else: # memory | |
| legacy_updates["memories"] = updated_conversation.get("memories", []) | |
| legacy_updates["memory_processing_status"] = updated_conversation.get("memory_processing_status", "pending") | |
| if legacy_updates: | |
| await conversations_col.update_one( | |
| {"conversation_id": conversation_id}, | |
| {"$set": legacy_updates} | |
| ) | |
| logger.info(f"Deleted {version_type} version {version_id} from conversation {conversation_id}") | |
| response_data = { | |
| "message": f"Successfully deleted {version_type} version {version_id}", | |
| "conversation_id": conversation_id, | |
| "version_type": version_type, | |
| "deleted_version_id": version_id, | |
| "was_active": is_active | |
| } | |
| if new_active_version: | |
| response_data["new_active_version"] = new_active_version | |
| return JSONResponse(status_code=200, content=response_data) | |
| except Exception as e: | |
| logger.error(f"Error deleting {version_type} version {version_id} from conversation {conversation_id}: {e}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to delete {version_type} version: {str(e)}"} | |
| ) | |
| # If deleting active version, we need to set a new active version | |
| new_active_version = None | |
| if is_active: | |
| # Find the most recent non-deleted version to make active | |
| remaining_versions = [v for v in versions if v.get("version_id") != version_id] | |
| if remaining_versions: | |
| # Normalize created_at to datetime before sorting to avoid misordering | |
| from datetime import datetime | |
| def _created_at_key(v): | |
| ts = v.get("created_at") | |
| if isinstance(ts, str): | |
| try: | |
| return datetime.fromisoformat(ts) | |
| except ValueError: | |
| return datetime.min | |
| return ts or datetime.min | |
| remaining_versions.sort(key=_created_at_key, reverse=True) | |
| new_active_version = remaining_versions[0]["version_id"] |
🧰 Tools
🪛 Ruff (0.13.1)
697-697: Do not catch blind exception: Exception
(BLE001)
698-698: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
701-701: Use explicit conversion flag
Replace with conversion flag
(RUF010)
| # Queue the reprocessing job | ||
| try: | ||
| from advanced_omi_backend.simple_queue import get_simple_queue | ||
|
|
||
| queue = await get_simple_queue() | ||
| job_id = await queue.enqueue_job( | ||
| job_type="reprocess_transcript", | ||
| user_id=str(user.user_id), | ||
| data={ | ||
| "conversation_id": conversation_id, | ||
| "audio_uuid": audio_uuid, | ||
| "audio_path": str(full_audio_path), | ||
| "run_id": run_id, | ||
| "version_id": version_id, | ||
| "audio_metadata": audio_stats | ||
| } | ||
| ) | ||
|
|
||
| logger.info(f"📋 Queued transcript reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})") | ||
| logger.info(f"📋 Job data: audio_path={full_audio_path}, duration={audio_stats.get('duration_seconds', 0)}s") | ||
|
|
||
| return JSONResponse(content={ | ||
| "message": f"Transcript reprocessing queued for conversation {conversation_id}", | ||
| "job_id": job_id, | ||
| "run_id": run_id, | ||
| "version_id": version_id, | ||
| "config_hash": config_hash, | ||
| "status": "QUEUED", | ||
| "audio_metadata": audio_stats | ||
| }) | ||
|
|
||
| except Exception as queue_error: | ||
| logger.error(f"Failed to queue transcript reprocessing job: {queue_error}") | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={ | ||
| "error": "Failed to queue transcript reprocessing job", | ||
| "details": str(queue_error) | ||
| } | ||
| ) |
There was a problem hiding this comment.
Queueing: include stack trace on enqueue failures; avoid hard‑coded provider in config hash
- Use
logger.exceptionon queue errors. - Replace hard-coded
"deepgram"with the active provider name used by TranscriptionManager to makeconfig_hashmeaningful across providers.
- config_data = {
- "audio_path": str(full_audio_path),
- "transcription_provider": "deepgram", # This would come from settings
- "trigger": "manual_reprocess"
- }
+ # Derive provider for dedupe hash
+ try:
+ from advanced_omi_backend.transcription import TranscriptionManager
+ from advanced_omi_backend.processors import get_processor_manager
+ tm = TranscriptionManager(chunk_repo=chunk_repo, processor_manager=get_processor_manager())
+ provider_name = tm.provider.name if tm and tm.provider else "unknown"
+ except Exception:
+ provider_name = "unknown"
+ config_data = {
+ "audio_path": str(full_audio_path),
+ "transcription_provider": provider_name,
+ "trigger": "manual_reprocess",
+ }
@@
- except Exception as queue_error:
- logger.error(f"Failed to queue transcript reprocessing job: {queue_error}")
+ except Exception as queue_error:
+ logger.exception("Failed to queue transcript reprocessing job")
return JSONResponse(
status_code=500,
content={
"error": "Failed to queue transcript reprocessing job",
"details": str(queue_error)
}
)Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.13.1)
1011-1011: Do not catch blind exception: Exception
(BLE001)
1012-1012: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 980 to 1019, update the queue error handling to call
logger.exception(...) instead of logger.error(...) so the full stack trace is
captured when enqueueing fails; and replace the hard-coded provider string
"deepgram" used when building config_hash with the actual active provider name
from the TranscriptionManager instance (e.g. read the manager or transcription
object that holds the provider name, use that value with a safe fallback like
"unknown_provider" if not available) so config_hash reflects the real
transcription provider across deployments.
| # Queue the memory processing job | ||
| try: | ||
| from advanced_omi_backend.simple_queue import get_simple_queue | ||
|
|
||
| queue = await get_simple_queue() | ||
| job_id = await queue.enqueue_job( | ||
| job_type="reprocess_memory", | ||
| user_id=str(user.user_id), | ||
| data={ | ||
| "conversation_id": conversation_id, | ||
| "audio_uuid": audio_uuid, | ||
| "transcript_version_id": transcript_version_id, | ||
| "run_id": run_id, | ||
| "version_id": version_id | ||
| } | ||
| ) | ||
|
|
||
| logger.info(f"Queued memory reprocessing job {job_id} for conversation {conversation_id} (run {run_id}, version {version_id})") | ||
|
|
||
| return JSONResponse(content={ | ||
| "message": f"Memory reprocessing queued for conversation {conversation_id}", | ||
| "job_id": job_id, | ||
| "run_id": run_id, | ||
| "version_id": version_id, | ||
| "transcript_version_id": transcript_version_id, | ||
| "config_hash": config_hash, | ||
| "status": "QUEUED" | ||
| }) | ||
|
|
||
| except Exception as queue_error: | ||
| logger.error(f"Failed to queue memory reprocessing job: {queue_error}") | ||
| return JSONResponse( | ||
| status_code=500, | ||
| content={ | ||
| "error": "Failed to queue memory reprocessing job", | ||
| "details": str(queue_error) | ||
| } | ||
| ) |
There was a problem hiding this comment.
Memory jobs are queued but the worker handler is a stub
As implemented, the queue will mark these jobs “completed” with no work done (see simple_queue). Either implement memory processing or temporarily block this endpoint (HTTP 501) until the worker supports it.
🧰 Tools
🪛 Ruff (0.13.1)
1125-1125: Do not catch blind exception: Exception
(BLE001)
1126-1126: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 1096 to 1133, the controller queues "reprocess_memory" jobs but the
worker currently treats that job type as a no-op, so queued jobs are immediately
marked completed; either implement the memory-processing worker logic or block
the endpoint until it exists. To fix: either (A) implement a proper handler for
job_type "reprocess_memory" in simple_queue's worker (consume job data, load
audio/transcript, run the memory reprocessing pipeline, persist updated
memories, and update job status/errors), or (B) change this endpoint to detect
that the worker lacks support and return HTTP 501 Not Implemented (with a clear
message) instead of queuing the job; pick one approach and update tests/logs
accordingly.
There was a problem hiding this comment.
memory reprocessing out of scope for this
There was a problem hiding this comment.
@thestumonkey Understood! Thanks for clarifying that memory reprocessing implementation is out of scope for this PR.
Since the endpoint is currently accessible but the worker doesn't actually process memory jobs (it just marks them as "completed"), you might want to consider either:
- Adding a clear note in the API response indicating this feature is not yet implemented, or
- Temporarily returning HTTP 501 Not Implemented for memory reprocessing endpoints
This would help avoid user confusion when they queue memory reprocessing jobs that appear to complete successfully but don't actually do any work.
Would you like me to create a follow-up issue to track the actual memory reprocessing implementation?
✏️ Learnings added
Learnt from: thestumonkey
PR: AnkushMalaker/friend-lite#117
File: backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py:1096-1133
Timestamp: 2025-09-23T06:46:12.764Z
Learning: Memory reprocessing functionality in the queue system is intentionally left as a stub/placeholder and is out of scope for current development. The endpoint exists but the worker doesn't implement the actual processing logic yet.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
| async def _handle_reprocess_memory(self, job: Dict[str, Any]): | ||
| """Handle memory reprocessing by calling the controller method.""" | ||
| # TODO: Implement when needed | ||
| pass |
There was a problem hiding this comment.
Prevent false “COMPLETED” for unimplemented memory jobs
Raise NotImplementedError so the job is marked FAILED instead of COMPLETED with empty result.
- async def _handle_reprocess_memory(self, job: Dict[str, Any]):
- """Handle memory reprocessing by calling the controller method."""
- # TODO: Implement when needed
- pass
+ async def _handle_reprocess_memory(self, job: Dict[str, Any]):
+ """Handle memory reprocessing by calling the controller method."""
+ raise NotImplementedError("Memory reprocessing is not implemented yet")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _handle_reprocess_memory(self, job: Dict[str, Any]): | |
| """Handle memory reprocessing by calling the controller method.""" | |
| # TODO: Implement when needed | |
| pass | |
| async def _handle_reprocess_memory(self, job: Dict[str, Any]): | |
| """Handle memory reprocessing by calling the controller method.""" | |
| raise NotImplementedError("Memory reprocessing is not implemented yet") |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/simple_queue.py around lines 188
to 191, the _handle_reprocess_memory method currently contains a no-op pass
which causes unimplemented memory reprocess jobs to be marked COMPLETED with
empty results; replace the pass with raising a NotImplementedError (with a clear
message like "Memory reprocessing not implemented") so the exception propagates
and the job system marks the job as FAILED; keep or update the docstring to
reflect that the method intentionally raises until implemented.
| async def _get_next_job_id(self) -> int: | ||
| """Get the next job ID using auto-increment.""" | ||
| # Use MongoDB's findOneAndUpdate to atomically increment counter | ||
| counter_doc = await self.db["job_counters"].find_one_and_update( | ||
| {"_id": "job_id"}, | ||
| {"$inc": {"sequence_value": 1}}, | ||
| upsert=True, | ||
| return_document=True | ||
| ) | ||
| return counter_doc["sequence_value"] |
There was a problem hiding this comment.
Critical: auto-increment counter can return None
Motor requires ReturnDocument.AFTER; with the current code first-time upsert can return None and crash. Also seed with $setOnInsert.
+from pymongo import ReturnDocument
@@
- counter_doc = await self.db["job_counters"].find_one_and_update(
+ counter_doc = await self.db["job_counters"].find_one_and_update(
{"_id": "job_id"},
- {"$inc": {"sequence_value": 1}},
- upsert=True,
- return_document=True
+ {"$inc": {"sequence_value": 1}, "$setOnInsert": {"sequence_value": 0}},
+ upsert=True,
+ return_document=ReturnDocument.AFTER
)
return counter_doc["sequence_value"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _get_next_job_id(self) -> int: | |
| """Get the next job ID using auto-increment.""" | |
| # Use MongoDB's findOneAndUpdate to atomically increment counter | |
| counter_doc = await self.db["job_counters"].find_one_and_update( | |
| {"_id": "job_id"}, | |
| {"$inc": {"sequence_value": 1}}, | |
| upsert=True, | |
| return_document=True | |
| ) | |
| return counter_doc["sequence_value"] | |
| from pymongo import ReturnDocument | |
| async def _get_next_job_id(self) -> int: | |
| """Get the next job ID using auto-increment.""" | |
| # Use MongoDB's findOneAndUpdate to atomically increment counter | |
| counter_doc = await self.db["job_counters"].find_one_and_update( | |
| {"_id": "job_id"}, | |
| {"$inc": {"sequence_value": 1}, "$setOnInsert": {"sequence_value": 0}}, | |
| upsert=True, | |
| return_document=ReturnDocument.AFTER | |
| ) | |
| return counter_doc["sequence_value"] |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/simple_queue.py around lines 253
to 262, the find_one_and_update call can return None on first upsert and crash;
change it to use ReturnDocument.AFTER (Motor's
return_document=ReturnDocument.AFTER) and include a $setOnInsert to seed
sequence_value (e.g. 0 or 1) so the document is created with an initial value,
then safely return the updated sequence_value after the atomic increment.
| <Route path="queue" element={ | ||
| <PageErrorBoundary> | ||
| <Queue /> | ||
| </PageErrorBoundary> | ||
| } /> |
There was a problem hiding this comment.
🛠️ Refactor suggestion
🧩 Analysis chain
Gate /queue behind admin
This route is only nav-gated; users could still hit /queue directly. Add an admin check at the route level.
Option A (if ProtectedRoute supports admin flag):
- <Route path="queue" element={
- <PageErrorBoundary>
- <Queue />
- </PageErrorBoundary>
- } />
+ <Route path="queue" element={
+ <ProtectedRoute requireAdmin>
+ <PageErrorBoundary>
+ <Queue />
+ </PageErrorBoundary>
+ </ProtectedRoute>
+ } />If not supported, I can add a small AdminRoute wrapper that reads isAdmin from AuthContext—say the word.
🏁 Script executed:
#!/bin/bash
# Check if ProtectedRoute already supports admin gating
rg -n --type=tsx 'function\s+ProtectedRoute|export default function ProtectedRoute' -C3
rg -n --type=tsx -C2 'requireAdmin|roles' src/components/auth/ProtectedRoute.tsxLength of output: 55
🏁 Script executed:
#!/bin/bash
# Gather protection-related symbols and show App.tsx around the queue route
echo "=== App.tsx snippet (first 240 lines) ==="
if [ -f backends/advanced/webui/src/App.tsx ]; then
nl -ba backends/advanced/webui/src/App.tsx | sed -n '1,240p'
else
echo "App.tsx not found at backends/advanced/webui/src/App.tsx"
fi
echo
echo "=== Search for ProtectedRoute occurrences ==="
rg -n --hidden -S "ProtectedRoute" || true
echo
echo "=== Search for potential ProtectedRoute definitions ==="
rg -n --hidden -S "function\s+ProtectedRoute|export\s+default\s+function\s+ProtectedRoute|const\s+ProtectedRoute\s*=" || true
echo
echo "=== Search for admin gating keywords ==="
rg -n --hidden -S "requireAdmin|require_admin|isAdmin|is_admin|roles|admin" || true
echo
echo "=== Search for auth hooks/context ==="
rg -n --hidden -S "AuthContext|useAuth|useUser|getUser|currentUser" || trueLength of output: 33790
Gate /queue behind admin (wrap with ProtectedRoute adminOnly)
ProtectedRoute already supports adminOnly (backends/advanced/webui/src/components/auth/ProtectedRoute.tsx); wrap the /queue Route in backends/advanced/webui/src/App.tsx with so non-admins cannot reach /queue directly.
🤖 Prompt for AI Agents
In backends/advanced/webui/src/App.tsx around lines 72 to 76, the Route for
"/queue" is not protected for admin users; wrap the Route's element with the
existing ProtectedRoute component using the adminOnly prop so only admins can
access it. Update the route to render <ProtectedRoute adminOnly>{/* current
children (PageErrorBoundary + Queue) */}</ProtectedRoute>, ensure ProtectedRoute
is imported from backends/advanced/webui/src/components/auth/ProtectedRoute.tsx
if not already, and keep the inner PageErrorBoundary and Queue components
unchanged.
| // Auto-refresh interval | ||
| useEffect(() => { | ||
| console.log('🔄 Setting up queue auto-refresh interval'); | ||
| const interval = setInterval(() => { | ||
| if (!loading) { | ||
| console.log('⏰ Auto-refreshing queue data'); | ||
| fetchData(); | ||
| } | ||
| }, 5000); // Refresh every 5 seconds | ||
|
|
||
| return () => { | ||
| console.log('🧹 Clearing queue auto-refresh interval'); | ||
| clearInterval(interval); | ||
| }; | ||
| }, []); // Remove dependencies to prevent interval recreation | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Auto-refresh uses a stale closure; interval never fires (or fires with stale state).
The effect captures the initial loading and fetchData references. As written, if (!loading) will always evaluate using the initial value, so auto-refresh may never run. Make fetchData stable and depend on current flags, or read flags via refs.
Apply this diff to stabilize the callback and dependencies:
+import React, { useState, useEffect, useCallback } from 'react';
@@
- useEffect(() => {
- console.log('🔄 Setting up queue auto-refresh interval');
- const interval = setInterval(() => {
- if (!loading) {
- console.log('⏰ Auto-refreshing queue data');
- fetchData();
- }
- }, 5000); // Refresh every 5 seconds
-
- return () => {
- console.log('🧹 Clearing queue auto-refresh interval');
- clearInterval(interval);
- };
- }, []); // Remove dependencies to prevent interval recreation
+ useEffect(() => {
+ const interval = setInterval(() => {
+ if (!loading && !refreshing) {
+ fetchData();
+ }
+ }, 5000);
+ return () => clearInterval(interval);
+ }, [loading, refreshing, fetchData]);Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Queue.tsx around lines 78 to 93 the
useEffect creating the auto-refresh interval captures stale references to
`loading` and `fetchData`, causing the interval callback to use initial values;
make `fetchData` stable with useCallback (including its own deps) and either
include `loading` (or a loading ref) in the effect's dependency list so the
interval closure sees current state, or read the latest loading value from a ref
inside the interval; update the cleanup to clearInterval as-is and ensure the
effect deps are correct to avoid recreating the interval unnecessarily.
| const fetchData = async () => { | ||
| console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading); | ||
| if (!refreshing) setRefreshing(true); | ||
|
|
||
| try { | ||
| console.log('🔄 Starting Promise.all for jobs and stats'); | ||
| await Promise.all([fetchJobs(), fetchStats()]); | ||
| console.log('✅ Promise.all completed successfully'); | ||
| } catch (error) { | ||
| console.error('❌ Error fetching queue data:', error); | ||
| } finally { | ||
| setLoading(false); | ||
| setRefreshing(false); | ||
| console.log('🏁 fetchData completed'); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Prevent overlapping fetches; bail out early when already refreshing.
Without a guard, rapid triggers (interval + user actions) can overlap network calls.
Apply this diff:
- const fetchData = async () => {
+ const fetchData = useCallback(async () => {
console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading);
- if (!refreshing) setRefreshing(true);
+ if (refreshing) return;
+ setRefreshing(true);
@@
- };
+ }, [filters, pagination.offset, refreshing, loading]);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const fetchData = async () => { | |
| console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading); | |
| if (!refreshing) setRefreshing(true); | |
| try { | |
| console.log('🔄 Starting Promise.all for jobs and stats'); | |
| await Promise.all([fetchJobs(), fetchStats()]); | |
| console.log('✅ Promise.all completed successfully'); | |
| } catch (error) { | |
| console.error('❌ Error fetching queue data:', error); | |
| } finally { | |
| setLoading(false); | |
| setRefreshing(false); | |
| console.log('🏁 fetchData completed'); | |
| } | |
| }; | |
| const fetchData = useCallback(async () => { | |
| console.log('📥 fetchData called, refreshing:', refreshing, 'loading:', loading); | |
| if (refreshing) return; | |
| setRefreshing(true); | |
| try { | |
| console.log('🔄 Starting Promise.all for jobs and stats'); | |
| await Promise.all([fetchJobs(), fetchStats()]); | |
| console.log('✅ Promise.all completed successfully'); | |
| } catch (error) { | |
| console.error('❌ Error fetching queue data:', error); | |
| } finally { | |
| setLoading(false); | |
| setRefreshing(false); | |
| console.log('🏁 fetchData completed'); | |
| } | |
| }, [filters, pagination.offset, refreshing, loading]); |
🤖 Prompt for AI Agents
In backends/advanced/webui/src/pages/Queue.tsx around lines 99 to 114, add an
early-exit guard at the top of fetchData so it returns immediately when
refreshing is true to prevent overlapping network calls; setRefreshing(true)
only when proceeding (so the flag protects concurrent invocations), keep the
try/catch/finally that awaits Promise.all([fetchJobs(), fetchStats()]) and still
setLoading(false)/setRefreshing(false) in finally to reset state.
|
@thestumonkey Just curious, the imports that are within the function: |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
I prefer to put it at the top of the file which is the pattern I've used for years. I guess it's useful for Claude to put in the function so you can see what it's using, and delete if you end up not using that code? I guess this method may end up with smaller binaries as you dont' end up with imports you don't need over time? |
|
Closing this, I assume this is outdated. |
Added version selection from the conversation page, delete version and the queuemanagement page
Summary by CodeRabbit
New Features
Improvements
API
UI