Implemented close conversation#127
Implemented close conversation#127thestumonkey wants to merge 433 commits intoSimpleOpenSoftware:mainfrom thestumonkey:close-convo
Conversation
update memory service
* auth WIP * make auth optional * minor fix * fix minor bug * Added a bunch of changes here * update frontend * rehauled and cleaned up auth * updated auth to remove custom user-id (#24) * updated auth to remove custom user-id * stale data protection * udpate * minor fix * update wip architecture * re-rehaul? * update auth in laptop client * push missing file * add existing data readme * update env template * memory debug * Refactor main into different files make advanced backend a package Move webui Refactor and fix the rest of the code Reduce duplication Update pyproject.toml to use optional dependencies instead of groups Update Docs * cleanup * fix client bug, required * test fact extraction prompt improvement * update quickstart * update info on .env.template * remove failure receovery deadcode * remove more deadcode update memory service
* cleanup and merge #22's changes Remove action items service * Removing action items * who even says industry standard
Co-authored-by: Neotastisch <54811660+Neotastisch@users.noreply.github.com>
* rename folder * update readme etc --------- Co-authored-by: Neotastisch <54811660+Neotastisch@users.noreply.github.com>
github actions for automatic deployment
refactored to use a service model for transcription, and got it working with rq
Declutterd main, moved all processors to RQ. Streaming and batch mode enabled on audio recording
streaming segments should now be able to identify the speaker. Removed a bunch of dead code
Tidied up dead code, added timeline to job page so we can see time taken, got speaker recognition working on conversation trigger
- Added `start.sh` and `start-workers.sh` to `.dockerignore`. - Removed unused `COMPUTE_MODE` arguments from `docker-compose.yml` and `Dockerfile`. - Updated `pyproject.toml` to require `pandas>=2.0.0` and added new dependencies: `faiss-cpu`, `torch`, and `torchaudio`. - Modified `download-pyannote.py` to use `token` instead of `use_auth_token` for model downloads. - Refactored imports and logging in `download-pyannote.py` and `websocket_wrapper.py` for better clarity.
Update Docker configuration and speaker recognition dependencies
…eet. Update Docker configurations to accept CUDA version as a build argument in speaker recognition and ASR services. Enhance dependency management in pyproject.toml for speaker recognition and ASR services with optional CUDA versions.
feat: Remove unused Dockerfiles and pyproject files for blackwell gpus
Also beefed up the queue maangement page
Also beefed up the queue maangement page
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR refactors the backend architecture toward a conversation-centric, job-queue-based processing model. It removes the ConversationManager singleton, simplifies client state closure, migrates transcription and audio processing into RQ-driven job chains, enriches conversation models with audio paths and status enums, introduces a consolidated dashboard API endpoint, downgrades logging verbosity, and aligns frontend conversation lookup from audio_uuid to conversation_id. Changes
Sequence Diagram(s)sequenceDiagram
participant WS as WebSocket<br/>Controller
participant RQ as RQ Job Queue
participant Redis as Redis
participant Conv as Conversation<br/>Model
participant Memory as Memory<br/>Service
WS->>RQ: stream_speech_detection_job
activate RQ
RQ->>Redis: Monitor transcription results
RQ->>Redis: Analyze meaningful speech
alt Speech Detected
RQ->>RQ: enqueue open_conversation_job
RQ->>Conv: Create conversation
Conv->>Redis: Signal file rotation
RQ->>RQ: enqueue process_cropping_job
RQ->>RQ: enqueue process_memory_job
RQ->>RQ: enqueue start_post_conversation_jobs
else No Speech
RQ->>RQ: Continue monitoring
end
deactivate RQ
RQ->>Conv: Store conversation_id
RQ->>Memory: Extract memories
Memory->>Conv: Append memory references
sequenceDiagram
participant Client as Client
participant WS as WebSocket
participant RQ as RQ
participant Redis as Redis
participant Conv as Conversation
Client->>WS: Disconnect
activate WS
WS->>RQ: Cancel speech_detection job
WS->>Redis: Fetch audio:session:* entries
WS->>Redis: Mark sessions "complete"
WS->>Redis: Delete audio:stream
WS->>Conv: (No longer via ConversationManager)
deactivate WS
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes The changes span multiple interrelated job workflows with new orchestration patterns (post-conversation job chains), data model modifications (ConversationStatus enum, audio paths), removal of an entire module (ConversationManager), and significant refactoring across job signature changes (parameter removal/addition) and control flow enhancements (inactivity timeouts, metadata enrichment, streaming speech detection escalation). Heterogeneous changes across 20+ files with mixed logic density require nuanced reasoning for each job workflow integration point. Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
GOAT |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
1 similar comment
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (4)
166-173: Avoid AttributeError: guard access to job.is_scheduled.Older RQ versions may not expose is_scheduled; this would crash status mapping.
- elif job.is_scheduled: + elif getattr(job, "is_scheduled", False): return "waiting"
331-337: Fix potential infinite SCAN loop (cursor truthiness with b"0").Using bytes '0' in while cursor causes endless iteration when there are < limit keys.
- stream_keys = [] - cursor = b"0" - while cursor and len(stream_keys) < limit: - cursor, keys = await audio_service.redis.scan( - cursor, match=f"{audio_service.audio_stream_prefix}*", count=limit - ) + stream_keys: list = [] + cursor: int = 0 + while True: + cursor, keys = await audio_service.redis.scan( + cursor=cursor, match=f"{audio_service.audio_stream_prefix}*", count=limit + ) + if not keys and cursor == 0: + break stream_keys.extend(keys[:limit - len(stream_keys)]) + if cursor == 0 or len(stream_keys) >= limit: + break
603-609: Same SCAN loop issue for sessions listing.- session_keys = [] - cursor = b"0" - while cursor and len(session_keys) < limit: - cursor, keys = await redis_client.scan( - cursor, match="audio:session:*", count=limit - ) + session_keys: list = [] + cursor: int = 0 + while True: + cursor, keys = await redis_client.scan( + cursor=cursor, match="audio:session:*", count=limit + ) + if not keys and cursor == 0: + break session_keys.extend(keys[:limit - len(session_keys)]) + if cursor == 0 or len(session_keys) >= limit: + break
672-675: And same SCAN loop issue in session cleanup.- session_keys = [] - cursor = b"0" - while cursor: - cursor, keys = await redis_client.scan(cursor, match="audio:session:*", count=100) + session_keys: list = [] + cursor: int = 0 + while True: + cursor, keys = await redis_client.scan(cursor=cursor, match="audio:session:*", count=100) + session_keys.extend(keys) + if cursor == 0: + break - session_keys.extend(keys)backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (3)
153-166: Critical: enqueue args don’t match updated transcribe_full_audio_job signature.
user_idis passed as a positional argument afterversion_id, pushing"upload"into theredis_clientslot and causing a runtime TypeError.transcript_job = transcription_queue.enqueue( transcribe_full_audio_job, conversation_id, result["audio_uuid"], result["file_path"], result["version_id"], - user_id, - "upload", + "upload", job_timeout=600, result_ttl=JOB_RESULT_TTL, job_id=f"upload_{conversation_id[:8]}", description=f"Transcribe audio for {conversation_id[:8]}", meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id} )
170-184: Critical: enqueue args don’t match updated recognise_speakers_job signature.Extra
user_idpositional arg misaligns parameters and the trailing list becomes the injectedredis_client, leading to runtime errors.speaker_job = transcription_queue.enqueue( recognise_speakers_job, conversation_id, result["version_id"], result["file_path"], - user_id, - "", # transcript_text - will be read from DB - [], # words - will be read from DB + "", # transcript_text - will be read from DB + [], # words - will be read from DB depends_on=transcript_job, job_timeout=600, result_ttl=JOB_RESULT_TTL, job_id=f"speaker_{conversation_id[:8]}", description=f"Recognize speakers for {conversation_id[:8]}", meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id} )
585-588: Close file_sink when stopping after END signal empties.Breaks out without closing the WAV; leaks file handle.
if consecutive_empty_reads >= max_empty_reads: - logger.info(f"✅ Stream empty after END signal - stopping audio collection") - break + logger.info("✅ Stream empty after END signal - stopping audio collection") + if file_sink: + await file_sink.close() + logger.info(f"✅ Closed final file on END: {wav_filename}") + break
🧹 Nitpick comments (16)
backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py (1)
131-133: Good change to reduce routine logging verbosity.Downgrading basic transcription success to debug level is appropriate for this routine operation. Note that diarized transcription success (line 98) remains at info level, which may be intentional given the higher complexity of diarization operations.
If consistent logging across transcription modes is desired, consider aligning the success logging level for both diarized and basic transcription:
transcript = alternative.get("transcript", "").strip() - logger.debug( + logger.info( f"Deepgram basic transcription successful: {len(transcript)} characters" )Or alternatively, downgrade the diarized success logging to match:
transcript = alternative["paragraphs"]["transcript"].strip() - logger.info( + logger.debug( f"Deepgram diarized transcription successful: {len(transcript)} characters" )backends/advanced/webui/src/pages/Conversations.tsx (1)
394-396: Prefer conversation_id for React list key (fallback to audio_uuid).Reduces key collisions if multiple conversations ever share an audio_uuid.
- key={conversation.audio_uuid} + key={conversation.conversation_id || conversation.audio_uuid}backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (4)
98-136: Improve error logging and exception chaining in cancel_job.Use logger.exception for stack traces and chain the HTTPException.
- except Exception as e: - logger.error(f"Failed to cancel/delete job {job_id}: {e}") - raise HTTPException(status_code=404, detail=f"Job not found or could not be cancelled: {str(e)}") + except Exception as e: + logger.exception("Failed to cancel/delete job %s", job_id) + raise HTTPException( + status_code=404, + detail=f"Job not found or could not be cancelled: {e!s}", + ) from e
815-817: Remove unused variable conversation_ids.Not used in fetch_session_jobs; keep code minimal.
- conversations = await Conversation.find(Conversation.audio_uuid == session_id).to_list() - conversation_ids = {conv.conversation_id for conv in conversations} + # Intentionally not resolving conversation_ids here; session match uses job.meta/args + _ = await Conversation.find(Conversation.audio_uuid == session_id).to_list()
944-945: Prefer wall-clock timestamp for API consumers.Event loop time is monotonic and not meaningful to clients.
- "timestamp": asyncio.get_event_loop().time() + "timestamp": __import__("time").time()
947-949: Use logger.exception and chain error in dashboard handler.Improves diagnosability.
- logger.error(f"Failed to get dashboard data: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {str(e)}") + logger.exception("Failed to get dashboard data") + raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {e!s}") from ebackends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)
63-115: Optimize: Duplicate user lookup.The user is fetched via
get_user_by_idat line 63 to get the email, then fetched again at line 98 for primary speakers filtering. Store the user object from the first call and reuse it.Apply this optimization:
- user = await get_user_by_id(user_id) + user = await get_user_by_id(user_id) # Fetch once and reuse if user: user_email = user.email else: logger.warning(f"Could not find user {user_id}") user_email = "" logger.info(f"🔄 Processing memory for conversation {conversation_id}, client={client_id}, user={user_id}") # ... existing code ... # Check primary speakers filter - user = await get_user_by_id(user_id) + # Reuse user object from above if user and user.primary_speakers:backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py (4)
23-23: Type hint: make Optional explicit (RUF013).Use
str | None(PEP 604) for an optional param.- speech_job_id: str = None, + speech_job_id: str | None = None,
258-263: 5s wait logging condition never/rarely triggers.
elapsed % 5 == 0with floats almost never hits. Log every new 5th second deterministically.- elapsed = time.time() - wait_start - if elapsed % 5 == 0: # Log every 5 seconds - logger.info(f"⏳ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)") + elapsed_sec = int(time.time() - wait_start) + # Log every 5 seconds without flooding + if 'last_audio_wait_log_sec' not in locals(): + last_audio_wait_log_sec = -1 + if elapsed_sec % 5 == 0 and elapsed_sec != last_audio_wait_log_sec: + logger.info(f"⏳ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed_sec}s elapsed)") + last_audio_wait_log_sec = elapsed_sec
266-267: Remove extraneous f-string prefix (F541).No placeholders here.
- logger.warning(f"⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription") + logger.warning("⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription")
169-195: Reduce Redis meta churn (optional).Saving job meta every loop second can be noisy. Consider updating meta only when values change or at a 2–5s cadence.
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py (1)
578-584: Remove unnecessary f-string prefixes (F541).These logs have no placeholders.
- logger.info(f"🛑 Session ended, exiting") + logger.info("🛑 Session ended, exiting") @@ - logger.warning(f"⏱️ Max runtime reached, exiting") + logger.warning("⏱️ Max runtime reached, exiting") @@ - logger.info(f"💬 Meaningful speech detected!") + logger.info("💬 Meaningful speech detected!") @@ - logger.info(f"🎤 Checking for enrolled speakers...") + logger.info("🎤 Checking for enrolled speakers...") @@ - logger.info(f"⏭️ No enrolled speakers, continuing to listen...") + logger.info("⏭️ No enrolled speakers, continuing to listen...") @@ - logger.info(f"✅ Session ended without speech") + logger.info("✅ Session ended without speech")Also applies to: 605-610, 623-623, 692-692
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (4)
547-549: Rename unused loop variable to underscore (B007).
stream_nameisn’t used.- for stream_name, msgs in audio_messages: + for _stream_name, msgs in audio_messages:
343-344: Prefer logging.exception for failures (TRY400).Preserves traceback automatically.
- logger.error(f"❌ RQ: Audio cropping failed for conversation {conversation_id}: {e}") + logger.exception(f"❌ RQ: Audio cropping failed for conversation {conversation_id}")
559-576: Nit: remove f-prefix where no placeholders (F541) and minor log polish.Optional cleanup.
- logger.info(f"📦 Session {session_id[:12]}: {total_chunk_count} total chunks " - f"(conversation {current_conversation_id[:12]}: {conversation_chunk_count} chunks)") + logger.info( + f"📦 Session {session_id[:12]}: {total_chunk_count} total chunks " + f"(conversation {current_conversation_id[:12]}: {conversation_chunk_count} chunks)" + )
596-609: Log strings: a couple of f-prefixes can be dropped (F541).The two lines have placeholders; others don’t—adjust accordingly if any plain strings remain.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
backends/advanced/src/advanced_omi_backend/client.py(1 hunks)backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py(4 hunks)backends/advanced/src/advanced_omi_backend/conversation_manager.py(0 hunks)backends/advanced/src/advanced_omi_backend/models/conversation.py(2 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py(8 hunks)backends/advanced/src/advanced_omi_backend/services/audio_stream/aggregator.py(2 hunks)backends/advanced/src/advanced_omi_backend/services/audio_stream/consumer.py(1 hunks)backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py(1 hunks)backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py(3 hunks)backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py(1 hunks)backends/advanced/src/advanced_omi_backend/utils/audio_utils.py(1 hunks)backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py(1 hunks)backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py(9 hunks)backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py(8 hunks)backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py(4 hunks)backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py(1 hunks)backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py(6 hunks)backends/advanced/webui/src/pages/Conversations.tsx(6 hunks)backends/advanced/webui/src/services/api.ts(1 hunks)
💤 Files with no reviewable changes (1)
- backends/advanced/src/advanced_omi_backend/conversation_manager.py
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
📚 Learning: 2025-09-18T01:42:08.562Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
Applied to files:
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.pybackends/advanced/src/advanced_omi_backend/workers/conversation_jobs.pybackends/advanced/src/advanced_omi_backend/controllers/websocket_controller.pybackends/advanced/webui/src/pages/Conversations.tsxbackends/advanced/src/advanced_omi_backend/client.pybackends/advanced/src/advanced_omi_backend/workers/memory_jobs.pybackends/advanced/src/advanced_omi_backend/models/conversation.pybackends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
🪛 Ruff (0.14.0)
backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py
1-1: Shebang is present but file is not executable
(EXE001)
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py
159-159: Avoid specifying long messages outside the exception class
(TRY003)
176-176: Avoid specifying long messages outside the exception class
(TRY003)
578-578: f-string without any placeholders
Remove extraneous f prefix
(F541)
582-582: f-string without any placeholders
Remove extraneous f prefix
(F541)
605-605: f-string without any placeholders
Remove extraneous f prefix
(F541)
610-610: f-string without any placeholders
Remove extraneous f prefix
(F541)
623-623: f-string without any placeholders
Remove extraneous f prefix
(F541)
692-692: f-string without any placeholders
Remove extraneous f prefix
(F541)
backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
23-23: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
84-84: Do not catch blind exception: Exception
(BLE001)
266-266: f-string without any placeholders
Remove extraneous f prefix
(F541)
backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
150-150: Do not catch blind exception: Exception
(BLE001)
158-158: Do not catch blind exception: Exception
(BLE001)
168-168: f-string without any placeholders
Remove extraneous f prefix
(F541)
206-206: Do not catch blind exception: Exception
(BLE001)
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py
162-162: Do not catch blind exception: Exception
(BLE001)
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
237-237: Unused function argument: redis_client
(ARG001)
268-268: Abstract raise to an inner function
(TRY301)
268-268: Avoid specifying long messages outside the exception class
(TRY003)
343-343: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
350-350: Unused function argument: user_id
(ARG001)
434-434: f-string without any placeholders
Remove extraneous f prefix
(F541)
448-448: Loop control variable stream_name not used within loop body
Rename unused stream_name to _stream_name
(B007)
465-465: Do not catch blind exception: Exception
(BLE001)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
101-101: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
111-111: Abstract raise to an inner function
(TRY301)
133-133: Do not catch blind exception: Exception
(BLE001)
134-134: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
135-135: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
135-135: Use explicit conversion flag
Replace with conversion flag
(RUF010)
562-562: Do not catch blind exception: Exception
(BLE001)
568-568: Do not catch blind exception: Exception
(BLE001)
574-574: Do not catch blind exception: Exception
(BLE001)
575-575: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
701-701: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
701-701: Use explicit conversion flag
Replace with conversion flag
(RUF010)
708-708: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
782-782: Do not catch blind exception: Exception
(BLE001)
786-786: Consider moving this statement to an else block
(TRY300)
787-787: Do not catch blind exception: Exception
(BLE001)
788-788: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
795-795: Do not catch blind exception: Exception
(BLE001)
796-796: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
804-804: Do not catch blind exception: Exception
(BLE001)
805-805: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
816-816: Local variable conversation_ids is assigned to but never used
Remove assignment to unused variable conversation_ids
(F841)
853-853: Loop control variable status_name not used within loop body
Rename unused status_name to _status_name
(B007)
891-891: Do not catch blind exception: Exception
(BLE001)
895-895: Consider moving this statement to an else block
(TRY300)
896-896: Do not catch blind exception: Exception
(BLE001)
897-897: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
949-949: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
949-949: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (25)
backends/advanced/src/advanced_omi_backend/services/audio_stream/consumer.py (1)
580-583: LGTM! Appropriate log level reduction.Changing this frequent operational log from info to debug reduces production log noise while preserving the information for debugging.
backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py (1)
251-255: LGTM! Appropriate log level reduction.Even with the existing throttling (every 10th chunk), debug level is more suitable for this operational detail in production environments.
backends/advanced/src/advanced_omi_backend/services/audio_stream/aggregator.py (2)
71-74: LGTM! Appropriate log level reduction.Debug level is more suitable for this data retrieval operation, reducing production log noise while maintaining visibility for troubleshooting.
146-149: LGTM! Appropriate log level reduction.Debug level is more suitable for this aggregation operation, consistent with the logging discipline applied across the audio_stream service.
backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py (2)
56-56: LGTM! Appropriate logging level for routine operation.Downgrading this routine technical detail to debug level reduces log noise while preserving the information for debugging. The more valuable operational context (estimated duration and timeout at lines 73-75) appropriately remains at info level.
267-267: LGTM! Appropriate logging hierarchy for streaming operations.Downgrading the connection confirmation to debug level is sensible. The streaming lifecycle remains visible at info level through the start (line 233) and completion (line 365) messages, while the technical connection detail is appropriately at debug level.
backends/advanced/webui/src/pages/Conversations.tsx (3)
214-251: Switch to conversation_id for transcript expansion looks good.The guard, state updates, and targeted refresh via getById are coherent and minimize full reloads.
Based on learnings
564-565: Consistent gating and state checks on conversation_id.Click handler and expanded state keyed by conversation_id are correct and avoid ambiguity.
Based on learnings
Also applies to: 574-579, 583-586
504-506: No issues found. The UI and backend are aligned on the delete operation.The backend DELETE endpoint at
conversation_routes.py:141is defined as@router.delete("/{audio_uuid}")and expectsaudio_uuidas the path parameter. The UI code correctly passesconversation.audio_uuidto the delete handler. Both use the same parameter, so there is no mismatch to fix.The learning note about using
conversation_idapplies specifically to reprocessing and version management operations, not to the delete endpoint.Likely an incorrect or invalid review comment.
backends/advanced/webui/src/services/api.ts (3)
145-149: Unified dashboard endpoint: LGTM.Parameterized expanded_sessions is clean and backward-compatible.
160-165: Keeping legacy endpoints commented is fine for now.No action.
156-159: Let me verify whether the/api/queue/sessions/clearroute mentioned in the review comment actually exists in the codebase:Let me search more thoroughly for queue-related routes in the backend:
No mismatch found—cleanup endpoints are correctly aligned between frontend and backend.
The verification confirms the endpoints match:
- UI (api.ts): calls
/api/streaming/cleanupand/api/streaming/cleanup-sessions?max_age_seconds=...- Backend (system_routes.py): implements
@router.post("/streaming/cleanup")and@router.post("/streaming/cleanup-sessions")The
/api/queue/sessions/clear?older_than_seconds=...route mentioned in the review does not exist. The current routes are correct and working as intended.Likely an incorrect or invalid review comment.
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (3)
156-157: Including 'audio' queue is correct.Ensures audio crop/persist jobs are discoverable.
453-454: Audio queue included in flush: LGTM.Covers persistence/crop jobs.
532-537: Comprehensive flush-all logic looks solid.Emptying queue first and cancelling running jobs before deletion is the right order; stale-registry cleanup is a good add.
Also applies to: 540-546, 548-576
backends/advanced/src/advanced_omi_backend/client.py (1)
135-147: LGTM! Clear migration path documented.The V2 architecture note is helpful for understanding that conversation closure is now handled by websocket controllers with RQ jobs. The unconditional cleanup simplifies the logic and ensures state is always cleaned up.
backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py (1)
13-50: LGTM! Clean RQ worker setup.The logging configuration before imports ensures visibility of job logs, and the worker setup is straightforward and well-structured.
backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py (1)
478-478: LGTM! Import path aligned with utility reorganization.The updated import path correctly reflects the centralization of audio utilities under
advanced_omi_backend.utils.audio_utils.backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py (1)
16-46: LGTM! Clean wrapper for speech detection.The
is_meaningful_speechfunction provides a clear, well-documented interface for checking meaningful speech. The example usage is helpful.backends/advanced/src/advanced_omi_backend/models/conversation.py (2)
33-37: LGTM! Clear conversation status tracking.The
ConversationStatusenum provides a clear way to track conversation lifecycle states (ACTIVE, COMPLETED, FAILED).
76-78: LGTM! Audio paths support conversation-centric architecture.The
audio_pathandcropped_audio_pathfields align well with the shift to a conversation-centric processing model, making it easier to track audio files associated with conversations.backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py (3)
24-24: LGTM! Import path aligned with utility reorganization.
130-207: LGTM! Comprehensive cleanup with job cancellation.The expanded cleanup logic now:
- Cancels speech detection jobs via RQ
- Marks all client sessions as complete in Redis
- Deletes Redis streams
- Includes robust error handling
The broad exception catches (flagged by static analysis) are appropriate here since cleanup should fail gracefully and continue attempting other cleanup steps.
699-750: LGTM! Batch processing updated for post-conversation workflow.The changes correctly:
- Use the updated import path for
write_audio_file- Switch to
start_post_conversation_jobsfor the new conversation-centric job workflowbackends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)
145-172: LGTM! Rich job metadata for UI display.The addition of memory details in job metadata provides valuable information for monitoring and debugging, including memory IDs and text snippets for UI display.
| # Validate input segments | ||
| validated_segments = [] | ||
| for start_rel, end_rel in speech_segments: | ||
| # Validate input timestamps | ||
| if start_abs >= end_abs: | ||
| if start_rel >= end_rel: | ||
| logger.warning( | ||
| f"⚠️ Invalid speech segment: start={start_abs} >= end={end_abs}, skipping" | ||
| f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping" | ||
| ) | ||
| continue | ||
|
|
||
| start_rel = start_abs - file_start_timestamp | ||
| end_rel = end_abs - file_start_timestamp | ||
|
|
||
| # Ensure relative timestamps are positive (sanity check) | ||
| # Ensure timestamps are positive (sanity check) | ||
| if start_rel < 0: | ||
| logger.warning( | ||
| f"⚠️ Negative start timestamp: {start_rel} (absolute: {start_abs}, file_start: {file_start_timestamp}), clamping to 0.0" | ||
| f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0" | ||
| ) | ||
| start_rel = 0.0 | ||
| if end_rel < 0: | ||
| logger.warning( | ||
| f"⚠️ Negative end timestamp: {end_rel} (absolute: {end_abs}, file_start: {file_start_timestamp}), skipping segment" | ||
| f"⚠️ Negative end timestamp: {end_rel}, skipping segment" | ||
| ) | ||
| continue | ||
|
|
||
| relative_segments.append((start_rel, end_rel)) | ||
| validated_segments.append((start_rel, end_rel)) | ||
|
|
||
| logger.info(f"🕐 Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}") | ||
| logger.info(f"🕐 Absolute segments: {speech_segments}") | ||
| logger.info(f"🕐 Relative segments: {relative_segments}") | ||
| logger.info(f"🕐 Processing cropping for {audio_uuid}") | ||
| logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}") | ||
| logger.info(f"🕐 Validated segments: {validated_segments}") | ||
|
|
||
| # Validate that we have valid relative segments after conversion | ||
| if not relative_segments: | ||
| # Validate that we have valid segments | ||
| if not validated_segments: | ||
| logger.warning( | ||
| f"No valid relative segments after timestamp conversion for {audio_uuid}" | ||
| f"No valid segments for cropping {audio_uuid}" | ||
| ) | ||
| return False | ||
|
|
||
| success = await _crop_audio_with_ffmpeg(original_path, relative_segments, output_path) | ||
| success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) | ||
| if success: | ||
| # Update database with cropped file info (keep original absolute timestamps for reference) | ||
| # Update database with cropped file info | ||
| cropped_filename = output_path.split("/")[-1] | ||
| if chunk_repo is not None: | ||
| await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) |
There was a problem hiding this comment.
Inconsistency: validated vs original segments.
The function validates speech segments (lines 276-303) and uses validated_segments for cropping (line 311), but passes the original speech_segments to update_cropped_audio (line 316). This means the database stores unvalidated segments while cropping uses validated ones.
Consider this fix to ensure consistency:
success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path)
if success:
# Update database with cropped file info
cropped_filename = output_path.split("/")[-1]
if chunk_repo is not None:
- await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
+ await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, validated_segments)
logger.info(f"Successfully processed cropped audio: {cropped_filename}")
return True📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Validate input segments | |
| validated_segments = [] | |
| for start_rel, end_rel in speech_segments: | |
| # Validate input timestamps | |
| if start_abs >= end_abs: | |
| if start_rel >= end_rel: | |
| logger.warning( | |
| f"⚠️ Invalid speech segment: start={start_abs} >= end={end_abs}, skipping" | |
| f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping" | |
| ) | |
| continue | |
| start_rel = start_abs - file_start_timestamp | |
| end_rel = end_abs - file_start_timestamp | |
| # Ensure relative timestamps are positive (sanity check) | |
| # Ensure timestamps are positive (sanity check) | |
| if start_rel < 0: | |
| logger.warning( | |
| f"⚠️ Negative start timestamp: {start_rel} (absolute: {start_abs}, file_start: {file_start_timestamp}), clamping to 0.0" | |
| f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0" | |
| ) | |
| start_rel = 0.0 | |
| if end_rel < 0: | |
| logger.warning( | |
| f"⚠️ Negative end timestamp: {end_rel} (absolute: {end_abs}, file_start: {file_start_timestamp}), skipping segment" | |
| f"⚠️ Negative end timestamp: {end_rel}, skipping segment" | |
| ) | |
| continue | |
| relative_segments.append((start_rel, end_rel)) | |
| validated_segments.append((start_rel, end_rel)) | |
| logger.info(f"🕐 Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}") | |
| logger.info(f"🕐 Absolute segments: {speech_segments}") | |
| logger.info(f"🕐 Relative segments: {relative_segments}") | |
| logger.info(f"🕐 Processing cropping for {audio_uuid}") | |
| logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}") | |
| logger.info(f"🕐 Validated segments: {validated_segments}") | |
| # Validate that we have valid relative segments after conversion | |
| if not relative_segments: | |
| # Validate that we have valid segments | |
| if not validated_segments: | |
| logger.warning( | |
| f"No valid relative segments after timestamp conversion for {audio_uuid}" | |
| f"No valid segments for cropping {audio_uuid}" | |
| ) | |
| return False | |
| success = await _crop_audio_with_ffmpeg(original_path, relative_segments, output_path) | |
| success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) | |
| if success: | |
| # Update database with cropped file info (keep original absolute timestamps for reference) | |
| # Update database with cropped file info | |
| cropped_filename = output_path.split("/")[-1] | |
| if chunk_repo is not None: | |
| await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments) | |
| # Validate input segments | |
| validated_segments = [] | |
| for start_rel, end_rel in speech_segments: | |
| # Validate input timestamps | |
| if start_rel >= end_rel: | |
| logger.warning( | |
| f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping" | |
| ) | |
| continue | |
| # Ensure timestamps are positive (sanity check) | |
| if start_rel < 0: | |
| logger.warning( | |
| f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0" | |
| ) | |
| start_rel = 0.0 | |
| if end_rel < 0: | |
| logger.warning( | |
| f"⚠️ Negative end timestamp: {end_rel}, skipping segment" | |
| ) | |
| continue | |
| validated_segments.append((start_rel, end_rel)) | |
| logger.info(f"🕐 Processing cropping for {audio_uuid}") | |
| logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}") | |
| logger.info(f"🕐 Validated segments: {validated_segments}") | |
| # Validate that we have valid segments | |
| if not validated_segments: | |
| logger.warning( | |
| f"No valid segments for cropping {audio_uuid}" | |
| ) | |
| return False | |
| success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path) | |
| if success: | |
| # Update database with cropped file info | |
| cropped_filename = output_path.split("/")[-1] | |
| if chunk_repo is not None: | |
| await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, validated_segments) |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/utils/audio_utils.py around lines
276 to 316, the code validates speech_segments into validated_segments and uses
validated_segments for cropping but then calls chunk_repo.update_cropped_audio
with the original (unvalidated) speech_segments; change the call to pass
validated_segments (ensure the same data shape expected by
update_cropped_audio), so the stored segments match what was actually cropped;
also keep the existing checks (only call update when chunk_repo is not None and
success is True).
| cropping_job = default_queue.enqueue( | ||
| process_cropping_job, | ||
| conversation_id, | ||
| result["file_path"], | ||
| user_id, | ||
| depends_on=speaker_job, | ||
| job_timeout=300, | ||
| result_ttl=JOB_RESULT_TTL, | ||
| job_id=f"crop_{conversation_id[:8]}", | ||
| description=f"Crop audio for {conversation_id[:8]}", | ||
| meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id} | ||
| ) |
There was a problem hiding this comment.
Critical: enqueue args don’t match process_cropping_job signature.
process_cropping_job(conversation_id, audio_path, redis_client=None) — passing user_id as a positional arg conflicts with decorator-injected redis_client.
cropping_job = default_queue.enqueue(
process_cropping_job,
conversation_id,
- result["file_path"],
- user_id,
+ result["file_path"],
depends_on=speaker_job,
job_timeout=300,
result_ttl=JOB_RESULT_TTL,
job_id=f"crop_{conversation_id[:8]}",
description=f"Crop audio for {conversation_id[:8]}",
meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id}
)🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py around lines
188 to 199, the enqueue call passes user_id as a third positional argument which
conflicts with the decorator-injected redis_client in
process_cropping_job(conversation_id, audio_path, redis_client=None); remove the
positional user_id from the enqueue call so the arguments are (conversation_id,
result["file_path"], depends_on=..., job_timeout=..., ...), and if the job truly
needs user_id either (A) include it in the job meta (meta={'audio_uuid': ...,
'conversation_id': conversation_id, 'user_id': user_id}) or (B) change
process_cropping_job signature to accept user_id before redis_client (def
process_cropping_job(conversation_id, audio_path, user_id=None,
redis_client=None)) and update any callers accordingly so redis_client remains
the injected third parameter.
| job = default_queue.enqueue( | ||
| process_cropping_job, | ||
| client_id, | ||
| conversation_id, | ||
| audio_path, | ||
| user_id, | ||
| audio_uuid, | ||
| original_path, | ||
| speech_segments, | ||
| output_path, | ||
| job_timeout=timeout_mapping.get(priority, 180), | ||
| result_ttl=JOB_RESULT_TTL, | ||
| job_id=f"cropping_{audio_uuid[:8]}", | ||
| description=f"Crop audio for {audio_uuid[:8]}", | ||
| meta={'audio_uuid': audio_uuid} | ||
| job_id=f"crop_{conversation_id[:12]}", | ||
| description=f"Crop audio for conversation {conversation_id[:12]}", | ||
| meta={'conversation_id': conversation_id} | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
enqueue_cropping passes extra arg; align with job signature.
Keep API if you want, but don’t pass user_id through to the job.
job = default_queue.enqueue(
process_cropping_job,
- conversation_id,
- audio_path,
- user_id,
+ conversation_id,
+ audio_path,
job_timeout=timeout_mapping.get(priority, 180),
result_ttl=JOB_RESULT_TTL,
job_id=f"crop_{conversation_id[:12]}",
description=f"Crop audio for conversation {conversation_id[:12]}",
meta={'conversation_id': conversation_id}
)🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py around lines
702 to 712, the enqueue call is passing user_id to process_cropping_job even
though the job signature does not accept it; remove user_id from the argument
list when calling default_queue.enqueue (keep conversation_id and audio_path
only), and if you intend to preserve an API that needs user_id for metadata,
move it into the meta dict (e.g., meta={'conversation_id': conversation_id,
'user_id': user_id}) instead of as a positional argument so the job signature
remains unchanged.
| # Update speech detection job metadata with conversation_id | ||
| if speech_job_id: | ||
| try: | ||
| from rq.job import Job | ||
| from advanced_omi_backend.controllers.queue_controller import redis_conn | ||
|
|
||
| speech_job = Job.fetch(speech_job_id, connection=redis_conn) | ||
| if speech_job and speech_job.meta: | ||
| speech_job.meta['conversation_id'] = conversation_id | ||
| # Remove session_level flag - now linked to conversation | ||
| speech_job.meta.pop('session_level', None) | ||
| speech_job.save_meta() | ||
| logger.info(f"🔗 Updated speech job {speech_job_id[:12]} with conversation_id") | ||
| except Exception as e: | ||
| logger.warning(f"⚠️ Failed to update speech job metadata: {e}") | ||
|
|
There was a problem hiding this comment.
Narrow exception scope and keep traceback.
Catching bare Exception masks real failures. Catch specific RQ/Redis exceptions and log with traceback.
- if speech_job_id:
- try:
+ if speech_job_id:
+ try:
from rq.job import Job
from advanced_omi_backend.controllers.queue_controller import redis_conn
+ from rq.exceptions import NoSuchJobError
+ from redis.exceptions import RedisError
@@
- except Exception as e:
- logger.warning(f"⚠️ Failed to update speech job metadata: {e}")
+ except (NoSuchJobError, RedisError) as e:
+ logger.exception("⚠️ Failed to update speech job metadata")🧰 Tools
🪛 Ruff (0.14.0)
84-84: Do not catch blind exception: Exception
(BLE001)
| # Enqueue post-conversation processing pipeline | ||
| from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs | ||
|
|
||
| job_ids = start_post_conversation_jobs( | ||
| conversation_id=conversation_id, | ||
| audio_uuid=session_id, | ||
| user_id=user_id, | ||
| user_email=user_email, | ||
| audio_file_path=file_path | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"📥 RQ: Enqueued batch processing chain: " | ||
| f"{job_ids['transcription']} → {job_ids['speaker_recognition']} → {job_ids['memory']}" | ||
| f"📥 Pipeline: transcribe({job_ids['transcription']}) → " | ||
| f"speaker({job_ids['speaker_recognition']}) → " | ||
| f"crop({job_ids['cropping']}) → " | ||
| f"memory({job_ids['memory']})" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
Double-check: passing session_id as audio_uuid to post-conversation pipeline.
start_post_conversation_jobs(..., audio_uuid=session_id, ...) looks inconsistent with the shift to conversation-centric processing. If any downstream reprocessing still uses audio_uuid semantics, passing a session_id could mis-associate artifacts.
- Prefer passing
conversation_idandaudio_file_pathonly, or pass the conversation’s actualaudio_uuidif still required. - Please confirm
start_post_conversation_jobssignature and expectations.
To verify usages:
🏁 Script executed:
#!/bin/bash
fd -t f 'queue_controller.py' | xargs -I{} sed -n '1,250p' {}
rg -nP 'start_post_conversation_jobs\s*\(' -C3
rg -nP 'audio_uuid\s*=' -g '!**/node_modules/**'Length of output: 14867
[/skip_cloning]
---
🏁 Script executed:
```shell
#!/bin/bash
# Find start_post_conversation_jobs function definition in queue_controller.py
rg -nP 'def start_post_conversation_jobs' -A 50 backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
# Get context around conversation_jobs.py:288 to understand what session_id is
sed -n '270,310p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
# Check what start_post_conversation_jobs does with the audio_uuid parameter
rg -nP 'audio_uuid' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py | head -20
Length of output: 2967
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for start_post_conversation_jobs definition anywhere in queue_controller
rg -n 'def start_post_conversation_jobs' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
# Get the function definition with more context (search for it and show 80 lines)
rg -nP 'def start_post_conversation_jobs' -A 80 backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
# Get the calling function signature in conversation_jobs.py to see where session_id comes from
sed -n '200,310p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py | head -100
Length of output: 4587
🏁 Script executed:
#!/bin/bash
# Search ALL Python files for start_post_conversation_jobs definition
rg -n 'def start_post_conversation_jobs' backends/advanced/src/ --type py
# Search for the function in queue_controller more carefully (might be multiline)
rg -n 'start_post_conversation_jobs' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py | head -20
# Get the function that contains line 288 (to see full signature and where session_id comes from)
sed -n '150,200p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.pyLength of output: 2498
🏁 Script executed:
#!/bin/bash
# Search for start_post_conversation_jobs across entire repository
find backends/advanced/src -name "*.py" -exec grep -l "def start_post_conversation_jobs" {} \;
# Also try searching without 'def' to see all references
rg 'start_post_conversation_jobs' backends/advanced/src --type py -n
# Get the function signature from conversation_jobs.py that calls it (find the function header)
sed -n '1,150p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py | grep -n "^async def\|^def" | tail -5Length of output: 647
🏁 Script executed:
#!/bin/bash
# List all function definitions in queue_controller.py
rg -nP '^(async )?def ' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
# Get the full signature and first 30 lines of open_conversation_job function
sed -n '18,100p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.pyLength of output: 3758
🏁 Script executed:
#!/bin/bash
# Get start_batch_processing_jobs function signature and implementation
sed -n '392,450p' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
# Check websocket_controller to see what it passes to start_post_conversation_jobs
sed -n '740,760p' backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
# Search for any TODO or FIXME comments about start_post_conversation_jobs
rg -n 'TODO|FIXME|start_post' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.pyLength of output: 3149
🏁 Script executed:
#!/bin/bash
# Check if start_post_conversation_jobs is mentioned in any comments or docs
rg -n 'start_post_conversation_jobs' backends/advanced/src backends/advanced/Docs backends/advanced/memory --type py --type md
# Get more context from websocket_controller around the call
sed -n '700,760p' backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
# Check if there's a recent commit that renamed or removed this function
git log --oneline --all -- '*queue_controller.py' 2>/dev/null | head -5Length of output: 3063
Critical: start_post_conversation_jobs function does not exist in queue_controller.py—code will fail on import.
The function being imported and called at conversation_jobs.py:286 and websocket_controller.py:744 does not exist. Both call sites will raise an ImportError at runtime.
The codebase contains start_batch_processing_jobs (line 392 of queue_controller.py) with the correct job chain logic, but the calls are trying to invoke a non-existent function. Additionally, both call sites are missing required parameters (user_id, user_email) that start_batch_processing_jobs expects.
Both call sites need either:
- Implementation of
start_post_conversation_jobsas a wrapper aroundstart_batch_processing_jobs, or - Refactoring to call
start_batch_processing_jobsdirectly with all required parameters (user_id and user_email are available in scope at both locations)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
around lines 285–299, the code imports and calls a non-existent
start_post_conversation_jobs; replace that import and call with the existing
start_batch_processing_jobs from
advanced_omi_backend.controllers.queue_controller and pass the required user_id
and user_email (available in scope) along with conversation_id,
audio_uuid=session_id, and audio_file_path=file_path, then update the job_ids
reference usage accordingly; alternatively, if you prefer a thin compatibility
layer, add a start_post_conversation_jobs wrapper in queue_controller.py that
forwards all required parameters to start_batch_processing_jobs and update
imports to use that wrapper.
| async def process_memory_job( | ||
| client_id: str, | ||
| user_id: str, | ||
| user_email: str, | ||
| conversation_id: str, | ||
| redis_client=None |
There was a problem hiding this comment.
Critical: Function signature mismatch with enqueue function.
The function signature only accepts conversation_id and redis_client, but the enqueue_memory_processing function (lines 209-214) still passes client_id, user_id, and user_email as separate arguments. This will cause runtime errors when the job is executed.
Update the enqueue function to match:
def enqueue_memory_processing(
- client_id: str,
- user_id: str,
- user_email: str,
conversation_id: str,
priority: JobPriority = JobPriority.NORMAL
):
"""
Enqueue a memory processing job.
Returns RQ Job object for tracking.
"""
timeout_mapping = {
JobPriority.URGENT: 3600, # 60 minutes
JobPriority.HIGH: 2400, # 40 minutes
JobPriority.NORMAL: 1800, # 30 minutes
JobPriority.LOW: 900 # 15 minutes
}
job = memory_queue.enqueue(
process_memory_job,
- client_id,
- user_id,
- user_email,
conversation_id,
job_timeout=timeout_mapping.get(priority, 1800),
result_ttl=JOB_RESULT_TTL,
job_id=f"memory_{conversation_id[:8]}",
description=f"Process memory for conversation {conversation_id[:8]}"
)
logger.info(f"📥 RQ: Enqueued memory job {job.id} for conversation {conversation_id}")
return jobCommittable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.14.0)
24-24: Unused function argument: redis_client
(ARG001)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py around
lines 22 to 24, the async function process_memory_job currently only accepts
(conversation_id: str, redis_client=None) but enqueue_memory_processing passes
client_id, user_id, user_email (plus conversation_id/redis_client); update the
function signature to accept the same parameters the enqueue call supplies
(e.g., async def process_memory_job(client_id: str, user_id: str, user_email:
str, conversation_id: str, redis_client=None):) and then adjust the function
body to use the new parameters where needed so the runtime argument order
matches the enqueue invocation. Ensure redis_client remains optional with a
default of None.
| @@ -0,0 +1,54 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Make the script executable.
The shebang is present but the file is not executable. If this script is intended to be run directly, add execute permissions.
Apply this fix:
chmod +x backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py🧰 Tools
🪛 Ruff (0.14.0)
1-1: Shebang is present but file is not executable
(EXE001)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py around
line 1 the shebang is present but the file lacks execute permissions; make the
script executable by updating its file mode (e.g., run chmod +x on that file in
the repository or set equivalent executable bit in your commit) so it can be
invoked directly.
| // If the string doesn't include timezone info, append 'Z' to treat as UTC | ||
| const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-') | ||
| ? timestamp | ||
| : timestamp + 'Z' | ||
| return new Date(isoString).toLocaleString() | ||
| } |
There was a problem hiding this comment.
Fix timestamp normalization to avoid invalid dates (space vs 'T', missing TZ).
Current logic can yield invalid strings like "YYYY-MM-DD HH:mm:ssZ" and may mis-detect timezones. Normalize space to 'T' first and append 'Z' only when no TZ is present.
Apply:
- // If the string doesn't include timezone info, append 'Z' to treat as UTC
- const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-')
- ? timestamp
- : timestamp + 'Z'
- return new Date(isoString).toLocaleString()
+ // Normalize "YYYY-MM-DD HH:mm:ss" -> "YYYY-MM-DDTHH:mm:ss" and add 'Z' if no timezone
+ const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T')
+ const isoString = /(?:Z|[+-]\d{2}:\d{2})$/.test(normalized) ? normalized : `${normalized}Z`
+ return new Date(isoString).toLocaleString()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // If the string doesn't include timezone info, append 'Z' to treat as UTC | |
| const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-') | |
| ? timestamp | |
| : timestamp + 'Z' | |
| return new Date(isoString).toLocaleString() | |
| } | |
| // Normalize "YYYY-MM-DD HH:mm:ss" -> "YYYY-MM-DDTHH:mm:ss" and add 'Z' if no timezone | |
| const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T') | |
| const isoString = /(?:Z|[+-]\d{2}:\d{2})$/.test(normalized) ? normalized : `${normalized}Z` | |
| return new Date(isoString).toLocaleString() |
…lite into close-convo # Conflicts: # backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py # backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py # backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py # backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py # backends/advanced/webui/src/pages/Queue.tsx
Also
I haven't sorted the tests yet as I am a bad person, but is next on my list now we have close conversation
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Style