Skip to content

Implemented close conversation#127

Closed
thestumonkey wants to merge 433 commits intoSimpleOpenSoftware:mainfrom
thestumonkey:close-convo
Closed

Implemented close conversation#127
thestumonkey wants to merge 433 commits intoSimpleOpenSoftware:mainfrom
thestumonkey:close-convo

Conversation

@thestumonkey
Copy link
Contributor

@thestumonkey thestumonkey commented Oct 19, 2025

Also

  • beefed up the queue mangement page a heapload, so should be great to do debugging.
  • can set RECORD_ONLY_ENROLLED_SPEAKERS=true to only start recording if the speaker is recognised
  • Runs crop audio
  • Depreacted audio chunks in favour of redis streams and persistance audio job
  • persistance audio job now runs as as background job and writes files to the conversastions when they end.

I haven't sorted the tests yet as I am a bad person, but is next on my list now we have close conversation

Summary by CodeRabbit

  • New Features

    • Added job cancellation endpoint to cancel queued or running jobs.
    • Added consolidated dashboard endpoint for real-time job and session monitoring.
    • Introduced conversation status tracking (ACTIVE, COMPLETED, FAILED).
  • Bug Fixes

    • Improved audio file cleanup and session management on disconnect.
    • Enhanced error handling for missing conversations and invalid audio paths.
  • Refactor

    • Simplified internal conversation lifecycle management.
    • Restructured API calls for improved dashboard data retrieval.
  • Style

    • Reduced logging verbosity for routine operations.

AnkushMalaker and others added 30 commits July 16, 2025 01:48
update memory service
* auth WIP

* make auth optional

* minor fix

* fix minor bug

* Added a bunch of changes here

* update frontend

* rehauled and cleaned up auth

* updated auth to remove custom user-id (#24)

* updated auth to remove custom user-id

* stale data protection

* udpate

* minor fix

* update wip architecture

* re-rehaul?

* update auth in laptop client

* push missing file

* add existing data readme

* update env template

* memory debug

* Refactor main into different files
make advanced backend a package
Move webui
Refactor and fix the rest of the code
Reduce duplication
Update pyproject.toml to use optional dependencies instead of groups
Update Docs

* cleanup

* fix client bug, required

* test fact extraction prompt improvement

* update quickstart

* update info on .env.template

* remove failure receovery deadcode

* remove more deadcode
update memory service
* cleanup and merge #22's changes
Remove action items service

* Removing action items

* who even says industry standard
Co-authored-by: Neotastisch <54811660+Neotastisch@users.noreply.github.com>
* rename folder

* update readme etc

---------

Co-authored-by: Neotastisch <54811660+Neotastisch@users.noreply.github.com>
github actions for automatic deployment
stuartalexander-nexus and others added 21 commits October 2, 2025 09:54
refactored to use a service model for transcription, and got it working with rq
Declutterd main, moved all processors to RQ. Streaming and batch mode enabled on audio recording
streaming segments should now be able to identify the speaker.  Removed a bunch of dead code
Tidied up dead code, added timeline to job page so we can see time taken, got speaker recognition working on conversation trigger
- Added `start.sh` and `start-workers.sh` to `.dockerignore`.
- Removed unused `COMPUTE_MODE` arguments from `docker-compose.yml` and `Dockerfile`.
- Updated `pyproject.toml` to require `pandas>=2.0.0` and added new dependencies: `faiss-cpu`, `torch`, and `torchaudio`.
- Modified `download-pyannote.py` to use `token` instead of `use_auth_token` for model downloads.
- Refactored imports and logging in `download-pyannote.py` and `websocket_wrapper.py` for better clarity.
Update Docker configuration and speaker recognition dependencies
…eet. Update Docker configurations to accept CUDA version as a build argument in speaker recognition and ASR services. Enhance dependency management in pyproject.toml for speaker recognition and ASR services with optional CUDA versions.
feat: Remove unused Dockerfiles and pyproject files for blackwell gpus
Also beefed up the queue maangement page
Also beefed up the queue maangement page
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 19, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This PR refactors the backend architecture toward a conversation-centric, job-queue-based processing model. It removes the ConversationManager singleton, simplifies client state closure, migrates transcription and audio processing into RQ-driven job chains, enriches conversation models with audio paths and status enums, introduces a consolidated dashboard API endpoint, downgrades logging verbosity, and aligns frontend conversation lookup from audio_uuid to conversation_id.

Changes

Cohort / File(s) Change Summary
Model Updates
backends/advanced/src/advanced_omi_backend/models/conversation.py
Added ConversationStatus nested enum with ACTIVE, COMPLETED, FAILED; introduced audio_path and cropped_audio_path optional fields to Conversation and MemoryVersion models.
Client & State Management
backends/advanced/src/advanced_omi_backend/client.py
Removed ConversationManager dependency from close_current_conversation; simplified to unconditional cleanup of speech-related state without external manager orchestration.
ConversationManager Removal
backends/advanced/src/advanced_omi_backend/conversation_manager.py
Deleted entire module including ConversationManager class, get_conversation_manager() function, and singleton _conversation_manager; conversation lifecycle now handled by job queue and WebSocket controllers.
WebSocket Controller & Job Orchestration
backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
Enhanced cleanup_client_state with Redis session handling and RQ job cancellation; replaced start_batch_processing_jobs with start_post_conversation_jobs; adjusted import paths for process_audio_chunk and write_audio_file to utils.audio_utils.
Job Queue Infrastructure
backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py
New RQ worker entry point script with logging configuration, queue name parsing, and worker lifecycle management.
Transcription Job Refactoring
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py
Removed user_id parameter; added upfront conversation validation; enriched job metadata with conversation details; introduced streaming speech detection flow that escalates to open_conversation_job on speech detection with speaker filtering support.
Conversation Job Enhancement
backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
Added speech_job_id parameter; introduced inactivity timeout tracking, speech analysis, and per-conversation file rotation; enqueues start_post_conversation_jobs on completion; increments conversation count and returns timeout flag in payload.
Audio Job Refactoring
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
Introduced conversation-aware process_cropping_job; reworked audio_streaming_persistence_job for per-conversation WAV file handling; updated enqueue_cropping signature to use conversation_id and audio_path; enhanced job metadata and result payloads.
Memory Job Simplification
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py
Removed client_id, user_id, user_email parameters; now fetches values from conversation model; added primary speakers filtering; enriched job metadata with memory details for UI display.
Audio Utilities
backends/advanced/src/advanced_omi_backend/utils/audio_utils.py
Reworked _process_audio_cropping_with_relative_timestamps to validate input segments already in relative format; enforces positivity constraints and skips invalid pairs.
Conversation Utilities
backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py
Added is_meaningful_speech() helper function wrapping analyze_speech for speech detection and timeout logic.
Logging Verbosity Reduction
backends/advanced/src/advanced_omi_backend/services/audio_stream/aggregator.py, consumer.py, producer.py; services/transcription/deepgram.py
Downgraded multiple info-level log calls to debug across audio streaming and transcription modules.
Import Path Corrections
backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py
Fixed import path for write_pcm_to_wav from audio_utils to utils.audio_utils.
Queue Router Expansion
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
Added cancel_job() DELETE endpoint; added get_dashboard_data() consolidating aggregated job/queue/session stats in single call; expanded queue iteration to include "audio" queue.
Frontend Conversation Lookup
backends/advanced/webui/src/pages/Conversations.tsx
Refactored transcript expansion and fetching to use conversation_id instead of audio_uuid; updated formatDate to append 'Z' for timezone normalization.
API Client Consolidation
backends/advanced/webui/src/services/api.ts
Replaced individual queue endpoints with consolidated getDashboard() call; preserved legacy cleanup endpoints (cleanupStuckWorkers, cleanupOldSessions).

Sequence Diagram(s)

sequenceDiagram
    participant WS as WebSocket<br/>Controller
    participant RQ as RQ Job Queue
    participant Redis as Redis
    participant Conv as Conversation<br/>Model
    participant Memory as Memory<br/>Service

    WS->>RQ: stream_speech_detection_job
    activate RQ
    
    RQ->>Redis: Monitor transcription results
    RQ->>Redis: Analyze meaningful speech
    
    alt Speech Detected
        RQ->>RQ: enqueue open_conversation_job
        RQ->>Conv: Create conversation
        Conv->>Redis: Signal file rotation
        RQ->>RQ: enqueue process_cropping_job
        RQ->>RQ: enqueue process_memory_job
        RQ->>RQ: enqueue start_post_conversation_jobs
    else No Speech
        RQ->>RQ: Continue monitoring
    end
    
    deactivate RQ
    
    RQ->>Conv: Store conversation_id
    RQ->>Memory: Extract memories
    Memory->>Conv: Append memory references
Loading
sequenceDiagram
    participant Client as Client
    participant WS as WebSocket
    participant RQ as RQ
    participant Redis as Redis
    participant Conv as Conversation
    
    Client->>WS: Disconnect
    activate WS
    WS->>RQ: Cancel speech_detection job
    WS->>Redis: Fetch audio:session:* entries
    WS->>Redis: Mark sessions "complete"
    WS->>Redis: Delete audio:stream
    WS->>Conv: (No longer via ConversationManager)
    deactivate WS
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

The changes span multiple interrelated job workflows with new orchestration patterns (post-conversation job chains), data model modifications (ConversationStatus enum, audio paths), removal of an entire module (ConversationManager), and significant refactoring across job signature changes (parameter removal/addition) and control flow enhancements (inactivity timeouts, metadata enrichment, streaming speech detection escalation). Heterogeneous changes across 20+ files with mixed logic density require nuanced reasoning for each job workflow integration point.

Possibly related PRs

  • Queue combo #121: Directly related—both migrate conversation/audio models to Beanie, introduce Conversation/AudioSession models, and move processing to RQ-based jobs with identical module and workflow changes.
  • Reprocessing WIP #96: Directly related—both implement conversation-centric, versioned transcript/memory reprocessing features including conversations collection, transcript_versions/memory_versions, and reprocess endpoints.
  • Better audio processing #111: Directly related—both modify audio processing utilities, specifically process_audio_chunk and _process_audio_cropping_with_relative_timestamps, with shared import path migrations.

Poem

🐰 The ConversationManager hops away,
As Redis streams dance and RQ jobs play,
Conversations bloom in the queue's design,
Cropping and memories in perfect line—
hop hop the backend now springs with such grace! 🌿✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Implemented close conversation" accurately refers to a real and significant aspect of the changeset. The PR does implement conversation closure functionality, evidenced by the removal of the ConversationManager module, simplification of the close_current_conversation method in client.py, and the addition of a post-conversation processing pipeline in conversation_jobs.py. While the PR scope is broader and includes architectural refactoring of job workflows, audio persistence, cropping jobs, and frontend updates, the title captures a meaningful and verifiable part of the changes. The title is clear and specific enough that a teammate scanning the history would understand that conversation closure was implemented, even if it doesn't fully encompass all the supporting infrastructure changes.
Docstring Coverage ✅ Passed Docstring coverage is 95.00% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator

GOAT

@AnkushMalaker
Copy link
Collaborator

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 20, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

1 similar comment
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 20, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (4)

166-173: Avoid AttributeError: guard access to job.is_scheduled.

Older RQ versions may not expose is_scheduled; this would crash status mapping.

-            elif job.is_scheduled:
+            elif getattr(job, "is_scheduled", False):
                 return "waiting"

331-337: Fix potential infinite SCAN loop (cursor truthiness with b"0").

Using bytes '0' in while cursor causes endless iteration when there are < limit keys.

-        stream_keys = []
-        cursor = b"0"
-        while cursor and len(stream_keys) < limit:
-            cursor, keys = await audio_service.redis.scan(
-                cursor, match=f"{audio_service.audio_stream_prefix}*", count=limit
-            )
+        stream_keys: list = []
+        cursor: int = 0
+        while True:
+            cursor, keys = await audio_service.redis.scan(
+                cursor=cursor, match=f"{audio_service.audio_stream_prefix}*", count=limit
+            )
+            if not keys and cursor == 0:
+                break
             stream_keys.extend(keys[:limit - len(stream_keys)])
+            if cursor == 0 or len(stream_keys) >= limit:
+                break

603-609: Same SCAN loop issue for sessions listing.

-        session_keys = []
-        cursor = b"0"
-        while cursor and len(session_keys) < limit:
-            cursor, keys = await redis_client.scan(
-                cursor, match="audio:session:*", count=limit
-            )
+        session_keys: list = []
+        cursor: int = 0
+        while True:
+            cursor, keys = await redis_client.scan(
+                cursor=cursor, match="audio:session:*", count=limit
+            )
+            if not keys and cursor == 0:
+                break
             session_keys.extend(keys[:limit - len(session_keys)])
+            if cursor == 0 or len(session_keys) >= limit:
+                break

672-675: And same SCAN loop issue in session cleanup.

-        session_keys = []
-        cursor = b"0"
-        while cursor:
-            cursor, keys = await redis_client.scan(cursor, match="audio:session:*", count=100)
+        session_keys: list = []
+        cursor: int = 0
+        while True:
+            cursor, keys = await redis_client.scan(cursor=cursor, match="audio:session:*", count=100)
+            session_keys.extend(keys)
+            if cursor == 0:
+                break
-            session_keys.extend(keys)
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (3)

153-166: Critical: enqueue args don’t match updated transcribe_full_audio_job signature.

user_id is passed as a positional argument after version_id, pushing "upload" into the redis_client slot and causing a runtime TypeError.

                 transcript_job = transcription_queue.enqueue(
                     transcribe_full_audio_job,
                     conversation_id,
                     result["audio_uuid"],
                     result["file_path"],
                     result["version_id"],
-                    user_id,
-                    "upload",
+                    "upload",
                     job_timeout=600,
                     result_ttl=JOB_RESULT_TTL,
                     job_id=f"upload_{conversation_id[:8]}",
                     description=f"Transcribe audio for {conversation_id[:8]}",
                     meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id}
                 )

170-184: Critical: enqueue args don’t match updated recognise_speakers_job signature.

Extra user_id positional arg misaligns parameters and the trailing list becomes the injected redis_client, leading to runtime errors.

                 speaker_job = transcription_queue.enqueue(
                     recognise_speakers_job,
                     conversation_id,
                     result["version_id"],
                     result["file_path"],
-                    user_id,
-                    "",  # transcript_text - will be read from DB
-                    [],  # words - will be read from DB
+                    "",  # transcript_text - will be read from DB
+                    [],  # words - will be read from DB
                     depends_on=transcript_job,
                     job_timeout=600,
                     result_ttl=JOB_RESULT_TTL,
                     job_id=f"speaker_{conversation_id[:8]}",
                     description=f"Recognize speakers for {conversation_id[:8]}",
                     meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id}
                 )

585-588: Close file_sink when stopping after END signal empties.

Breaks out without closing the WAV; leaks file handle.

                     if consecutive_empty_reads >= max_empty_reads:
-                        logger.info(f"✅ Stream empty after END signal - stopping audio collection")
-                        break
+                        logger.info("✅ Stream empty after END signal - stopping audio collection")
+                        if file_sink:
+                            await file_sink.close()
+                            logger.info(f"✅ Closed final file on END: {wav_filename}")
+                        break
🧹 Nitpick comments (16)
backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py (1)

131-133: Good change to reduce routine logging verbosity.

Downgrading basic transcription success to debug level is appropriate for this routine operation. Note that diarized transcription success (line 98) remains at info level, which may be intentional given the higher complexity of diarization operations.

If consistent logging across transcription modes is desired, consider aligning the success logging level for both diarized and basic transcription:

                            transcript = alternative.get("transcript", "").strip()
-                            logger.debug(
+                            logger.info(
                                f"Deepgram basic transcription successful: {len(transcript)} characters"
                            )

Or alternatively, downgrade the diarized success logging to match:

                            transcript = alternative["paragraphs"]["transcript"].strip()
-                            logger.info(
+                            logger.debug(
                                f"Deepgram diarized transcription successful: {len(transcript)} characters"
                            )
backends/advanced/webui/src/pages/Conversations.tsx (1)

394-396: Prefer conversation_id for React list key (fallback to audio_uuid).

Reduces key collisions if multiple conversations ever share an audio_uuid.

-              key={conversation.audio_uuid}
+              key={conversation.conversation_id || conversation.audio_uuid}
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (4)

98-136: Improve error logging and exception chaining in cancel_job.

Use logger.exception for stack traces and chain the HTTPException.

-    except Exception as e:
-        logger.error(f"Failed to cancel/delete job {job_id}: {e}")
-        raise HTTPException(status_code=404, detail=f"Job not found or could not be cancelled: {str(e)}")
+    except Exception as e:
+        logger.exception("Failed to cancel/delete job %s", job_id)
+        raise HTTPException(
+            status_code=404,
+            detail=f"Job not found or could not be cancelled: {e!s}",
+        ) from e

815-817: Remove unused variable conversation_ids.

Not used in fetch_session_jobs; keep code minimal.

-                conversations = await Conversation.find(Conversation.audio_uuid == session_id).to_list()
-                conversation_ids = {conv.conversation_id for conv in conversations}
+                # Intentionally not resolving conversation_ids here; session match uses job.meta/args
+                _ = await Conversation.find(Conversation.audio_uuid == session_id).to_list()

944-945: Prefer wall-clock timestamp for API consumers.

Event loop time is monotonic and not meaningful to clients.

-            "timestamp": asyncio.get_event_loop().time()
+            "timestamp": __import__("time").time()

947-949: Use logger.exception and chain error in dashboard handler.

Improves diagnosability.

-        logger.error(f"Failed to get dashboard data: {e}", exc_info=True)
-        raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {str(e)}")
+        logger.exception("Failed to get dashboard data")
+        raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {e!s}") from e
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)

63-115: Optimize: Duplicate user lookup.

The user is fetched via get_user_by_id at line 63 to get the email, then fetched again at line 98 for primary speakers filtering. Store the user object from the first call and reuse it.

Apply this optimization:

-    user = await get_user_by_id(user_id)
+    user = await get_user_by_id(user_id)  # Fetch once and reuse
     if user:
         user_email = user.email
     else:
         logger.warning(f"Could not find user {user_id}")
         user_email = ""

     logger.info(f"🔄 Processing memory for conversation {conversation_id}, client={client_id}, user={user_id}")

     # ... existing code ...

     # Check primary speakers filter
-    user = await get_user_by_id(user_id)
+    # Reuse user object from above
     if user and user.primary_speakers:
backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py (4)

23-23: Type hint: make Optional explicit (RUF013).

Use str | None (PEP 604) for an optional param.

-    speech_job_id: str = None,
+    speech_job_id: str | None = None,

258-263: 5s wait logging condition never/rarely triggers.

elapsed % 5 == 0 with floats almost never hits. Log every new 5th second deterministically.

-        elapsed = time.time() - wait_start
-        if elapsed % 5 == 0:  # Log every 5 seconds
-            logger.info(f"⏳ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed:.0f}s elapsed)")
+        elapsed_sec = int(time.time() - wait_start)
+        # Log every 5 seconds without flooding
+        if 'last_audio_wait_log_sec' not in locals():
+            last_audio_wait_log_sec = -1
+        if elapsed_sec % 5 == 0 and elapsed_sec != last_audio_wait_log_sec:
+            logger.info(f"⏳ Waiting for audio file (conversation {conversation_id[:12]})... ({elapsed_sec}s elapsed)")
+            last_audio_wait_log_sec = elapsed_sec

266-267: Remove extraneous f-string prefix (F541).

No placeholders here.

-        logger.warning(f"⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription")
+        logger.warning("⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription")

169-195: Reduce Redis meta churn (optional).

Saving job meta every loop second can be noisy. Consider updating meta only when values change or at a 2–5s cadence.

backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py (1)

578-584: Remove unnecessary f-string prefixes (F541).

These logs have no placeholders.

-            logger.info(f"🛑 Session ended, exiting")
+            logger.info("🛑 Session ended, exiting")
@@
-            logger.warning(f"⏱️ Max runtime reached, exiting")
+            logger.warning("⏱️ Max runtime reached, exiting")
@@
-        logger.info(f"💬 Meaningful speech detected!")
+        logger.info("💬 Meaningful speech detected!")
@@
-            logger.info(f"🎤 Checking for enrolled speakers...")
+            logger.info("🎤 Checking for enrolled speakers...")
@@
-                logger.info(f"⏭️ No enrolled speakers, continuing to listen...")
+                logger.info("⏭️ No enrolled speakers, continuing to listen...")
@@
-    logger.info(f"✅ Session ended without speech")
+    logger.info("✅ Session ended without speech")

Also applies to: 605-610, 623-623, 692-692

backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (4)

547-549: Rename unused loop variable to underscore (B007).

stream_name isn’t used.

-                for stream_name, msgs in audio_messages:
+                for _stream_name, msgs in audio_messages:

343-344: Prefer logging.exception for failures (TRY400).

Preserves traceback automatically.

-        logger.error(f"❌ RQ: Audio cropping failed for conversation {conversation_id}: {e}")
+        logger.exception(f"❌ RQ: Audio cropping failed for conversation {conversation_id}")

559-576: Nit: remove f-prefix where no placeholders (F541) and minor log polish.

Optional cleanup.

-                            logger.info(f"📦 Session {session_id[:12]}: {total_chunk_count} total chunks "
-                                        f"(conversation {current_conversation_id[:12]}: {conversation_chunk_count} chunks)")
+                            logger.info(
+                                f"📦 Session {session_id[:12]}: {total_chunk_count} total chunks "
+                                f"(conversation {current_conversation_id[:12]}: {conversation_chunk_count} chunks)"
+                            )

596-609: Log strings: a couple of f-prefixes can be dropped (F541).

The two lines have placeholders; others don’t—adjust accordingly if any plain strings remain.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d798fef and 97b5370.

📒 Files selected for processing (19)
  • backends/advanced/src/advanced_omi_backend/client.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py (4 hunks)
  • backends/advanced/src/advanced_omi_backend/conversation_manager.py (0 hunks)
  • backends/advanced/src/advanced_omi_backend/models/conversation.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (8 hunks)
  • backends/advanced/src/advanced_omi_backend/services/audio_stream/aggregator.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/services/audio_stream/consumer.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py (3 hunks)
  • backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/utils/audio_utils.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (9 hunks)
  • backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py (8 hunks)
  • backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (4 hunks)
  • backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py (6 hunks)
  • backends/advanced/webui/src/pages/Conversations.tsx (6 hunks)
  • backends/advanced/webui/src/services/api.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • backends/advanced/src/advanced_omi_backend/conversation_manager.py
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
📚 Learning: 2025-09-18T01:42:08.562Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management

Applied to files:

  • backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py
  • backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
  • backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
  • backends/advanced/webui/src/pages/Conversations.tsx
  • backends/advanced/src/advanced_omi_backend/client.py
  • backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py
  • backends/advanced/src/advanced_omi_backend/models/conversation.py
  • backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
🪛 Ruff (0.14.0)
backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py

1-1: Shebang is present but file is not executable

(EXE001)

backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py

159-159: Avoid specifying long messages outside the exception class

(TRY003)


176-176: Avoid specifying long messages outside the exception class

(TRY003)


578-578: f-string without any placeholders

Remove extraneous f prefix

(F541)


582-582: f-string without any placeholders

Remove extraneous f prefix

(F541)


605-605: f-string without any placeholders

Remove extraneous f prefix

(F541)


610-610: f-string without any placeholders

Remove extraneous f prefix

(F541)


623-623: f-string without any placeholders

Remove extraneous f prefix

(F541)


692-692: f-string without any placeholders

Remove extraneous f prefix

(F541)

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

23-23: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


84-84: Do not catch blind exception: Exception

(BLE001)


266-266: f-string without any placeholders

Remove extraneous f prefix

(F541)

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

150-150: Do not catch blind exception: Exception

(BLE001)


158-158: Do not catch blind exception: Exception

(BLE001)


168-168: f-string without any placeholders

Remove extraneous f prefix

(F541)


206-206: Do not catch blind exception: Exception

(BLE001)

backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py

162-162: Do not catch blind exception: Exception

(BLE001)

backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py

237-237: Unused function argument: redis_client

(ARG001)


268-268: Abstract raise to an inner function

(TRY301)


268-268: Avoid specifying long messages outside the exception class

(TRY003)


343-343: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


350-350: Unused function argument: user_id

(ARG001)


434-434: f-string without any placeholders

Remove extraneous f prefix

(F541)


448-448: Loop control variable stream_name not used within loop body

Rename unused stream_name to _stream_name

(B007)


465-465: Do not catch blind exception: Exception

(BLE001)

backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py

101-101: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


111-111: Abstract raise to an inner function

(TRY301)


133-133: Do not catch blind exception: Exception

(BLE001)


134-134: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


135-135: Use explicit conversion flag

Replace with conversion flag

(RUF010)


562-562: Do not catch blind exception: Exception

(BLE001)


568-568: Do not catch blind exception: Exception

(BLE001)


574-574: Do not catch blind exception: Exception

(BLE001)


575-575: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


701-701: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


701-701: Use explicit conversion flag

Replace with conversion flag

(RUF010)


708-708: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


782-782: Do not catch blind exception: Exception

(BLE001)


786-786: Consider moving this statement to an else block

(TRY300)


787-787: Do not catch blind exception: Exception

(BLE001)


788-788: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


795-795: Do not catch blind exception: Exception

(BLE001)


796-796: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


804-804: Do not catch blind exception: Exception

(BLE001)


805-805: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


816-816: Local variable conversation_ids is assigned to but never used

Remove assignment to unused variable conversation_ids

(F841)


853-853: Loop control variable status_name not used within loop body

Rename unused status_name to _status_name

(B007)


891-891: Do not catch blind exception: Exception

(BLE001)


895-895: Consider moving this statement to an else block

(TRY300)


896-896: Do not catch blind exception: Exception

(BLE001)


897-897: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


949-949: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


949-949: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🔇 Additional comments (25)
backends/advanced/src/advanced_omi_backend/services/audio_stream/consumer.py (1)

580-583: LGTM! Appropriate log level reduction.

Changing this frequent operational log from info to debug reduces production log noise while preserving the information for debugging.

backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py (1)

251-255: LGTM! Appropriate log level reduction.

Even with the existing throttling (every 10th chunk), debug level is more suitable for this operational detail in production environments.

backends/advanced/src/advanced_omi_backend/services/audio_stream/aggregator.py (2)

71-74: LGTM! Appropriate log level reduction.

Debug level is more suitable for this data retrieval operation, reducing production log noise while maintaining visibility for troubleshooting.


146-149: LGTM! Appropriate log level reduction.

Debug level is more suitable for this aggregation operation, consistent with the logging discipline applied across the audio_stream service.

backends/advanced/src/advanced_omi_backend/services/transcription/deepgram.py (2)

56-56: LGTM! Appropriate logging level for routine operation.

Downgrading this routine technical detail to debug level reduces log noise while preserving the information for debugging. The more valuable operational context (estimated duration and timeout at lines 73-75) appropriately remains at info level.


267-267: LGTM! Appropriate logging hierarchy for streaming operations.

Downgrading the connection confirmation to debug level is sensible. The streaming lifecycle remains visible at info level through the start (line 233) and completion (line 365) messages, while the technical connection detail is appropriately at debug level.

backends/advanced/webui/src/pages/Conversations.tsx (3)

214-251: Switch to conversation_id for transcript expansion looks good.

The guard, state updates, and targeted refresh via getById are coherent and minimize full reloads.

Based on learnings


564-565: Consistent gating and state checks on conversation_id.

Click handler and expanded state keyed by conversation_id are correct and avoid ambiguity.

Based on learnings

Also applies to: 574-579, 583-586


504-506: No issues found. The UI and backend are aligned on the delete operation.

The backend DELETE endpoint at conversation_routes.py:141 is defined as @router.delete("/{audio_uuid}") and expects audio_uuid as the path parameter. The UI code correctly passes conversation.audio_uuid to the delete handler. Both use the same parameter, so there is no mismatch to fix.

The learning note about using conversation_id applies specifically to reprocessing and version management operations, not to the delete endpoint.

Likely an incorrect or invalid review comment.

backends/advanced/webui/src/services/api.ts (3)

145-149: Unified dashboard endpoint: LGTM.

Parameterized expanded_sessions is clean and backward-compatible.


160-165: Keeping legacy endpoints commented is fine for now.

No action.


156-159: Let me verify whether the /api/queue/sessions/clear route mentioned in the review comment actually exists in the codebase:

Let me search more thoroughly for queue-related routes in the backend:

No mismatch found—cleanup endpoints are correctly aligned between frontend and backend.

The verification confirms the endpoints match:

  • UI (api.ts): calls /api/streaming/cleanup and /api/streaming/cleanup-sessions?max_age_seconds=...
  • Backend (system_routes.py): implements @router.post("/streaming/cleanup") and @router.post("/streaming/cleanup-sessions")

The /api/queue/sessions/clear?older_than_seconds=... route mentioned in the review does not exist. The current routes are correct and working as intended.

Likely an incorrect or invalid review comment.

backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (3)

156-157: Including 'audio' queue is correct.

Ensures audio crop/persist jobs are discoverable.


453-454: Audio queue included in flush: LGTM.

Covers persistence/crop jobs.


532-537: Comprehensive flush-all logic looks solid.

Emptying queue first and cancelling running jobs before deletion is the right order; stale-registry cleanup is a good add.

Also applies to: 540-546, 548-576

backends/advanced/src/advanced_omi_backend/client.py (1)

135-147: LGTM! Clear migration path documented.

The V2 architecture note is helpful for understanding that conversation closure is now handled by websocket controllers with RQ jobs. The unconditional cleanup simplifies the logic and ensures state is always cleaned up.

backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py (1)

13-50: LGTM! Clean RQ worker setup.

The logging configuration before imports ensures visibility of job logs, and the worker setup is straightforward and well-structured.

backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py (1)

478-478: LGTM! Import path aligned with utility reorganization.

The updated import path correctly reflects the centralization of audio utilities under advanced_omi_backend.utils.audio_utils.

backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py (1)

16-46: LGTM! Clean wrapper for speech detection.

The is_meaningful_speech function provides a clear, well-documented interface for checking meaningful speech. The example usage is helpful.

backends/advanced/src/advanced_omi_backend/models/conversation.py (2)

33-37: LGTM! Clear conversation status tracking.

The ConversationStatus enum provides a clear way to track conversation lifecycle states (ACTIVE, COMPLETED, FAILED).


76-78: LGTM! Audio paths support conversation-centric architecture.

The audio_path and cropped_audio_path fields align well with the shift to a conversation-centric processing model, making it easier to track audio files associated with conversations.

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py (3)

24-24: LGTM! Import path aligned with utility reorganization.


130-207: LGTM! Comprehensive cleanup with job cancellation.

The expanded cleanup logic now:

  • Cancels speech detection jobs via RQ
  • Marks all client sessions as complete in Redis
  • Deletes Redis streams
  • Includes robust error handling

The broad exception catches (flagged by static analysis) are appropriate here since cleanup should fail gracefully and continue attempting other cleanup steps.


699-750: LGTM! Batch processing updated for post-conversation workflow.

The changes correctly:

  • Use the updated import path for write_audio_file
  • Switch to start_post_conversation_jobs for the new conversation-centric job workflow
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)

145-172: LGTM! Rich job metadata for UI display.

The addition of memory details in job metadata provides valuable information for monitoring and debugging, including memory IDs and text snippets for UI display.

Comment on lines +276 to 316
# Validate input segments
validated_segments = []
for start_rel, end_rel in speech_segments:
# Validate input timestamps
if start_abs >= end_abs:
if start_rel >= end_rel:
logger.warning(
f"⚠️ Invalid speech segment: start={start_abs} >= end={end_abs}, skipping"
f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping"
)
continue

start_rel = start_abs - file_start_timestamp
end_rel = end_abs - file_start_timestamp

# Ensure relative timestamps are positive (sanity check)
# Ensure timestamps are positive (sanity check)
if start_rel < 0:
logger.warning(
f"⚠️ Negative start timestamp: {start_rel} (absolute: {start_abs}, file_start: {file_start_timestamp}), clamping to 0.0"
f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0"
)
start_rel = 0.0
if end_rel < 0:
logger.warning(
f"⚠️ Negative end timestamp: {end_rel} (absolute: {end_abs}, file_start: {file_start_timestamp}), skipping segment"
f"⚠️ Negative end timestamp: {end_rel}, skipping segment"
)
continue

relative_segments.append((start_rel, end_rel))
validated_segments.append((start_rel, end_rel))

logger.info(f"🕐 Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}")
logger.info(f"🕐 Absolute segments: {speech_segments}")
logger.info(f"🕐 Relative segments: {relative_segments}")
logger.info(f"🕐 Processing cropping for {audio_uuid}")
logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}")
logger.info(f"🕐 Validated segments: {validated_segments}")

# Validate that we have valid relative segments after conversion
if not relative_segments:
# Validate that we have valid segments
if not validated_segments:
logger.warning(
f"No valid relative segments after timestamp conversion for {audio_uuid}"
f"No valid segments for cropping {audio_uuid}"
)
return False

success = await _crop_audio_with_ffmpeg(original_path, relative_segments, output_path)
success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path)
if success:
# Update database with cropped file info (keep original absolute timestamps for reference)
# Update database with cropped file info
cropped_filename = output_path.split("/")[-1]
if chunk_repo is not None:
await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistency: validated vs original segments.

The function validates speech segments (lines 276-303) and uses validated_segments for cropping (line 311), but passes the original speech_segments to update_cropped_audio (line 316). This means the database stores unvalidated segments while cropping uses validated ones.

Consider this fix to ensure consistency:

             success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path)
         if success:
             # Update database with cropped file info
             cropped_filename = output_path.split("/")[-1]
             if chunk_repo is not None:
-                await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
+                await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, validated_segments)
             logger.info(f"Successfully processed cropped audio: {cropped_filename}")
             return True
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Validate input segments
validated_segments = []
for start_rel, end_rel in speech_segments:
# Validate input timestamps
if start_abs >= end_abs:
if start_rel >= end_rel:
logger.warning(
f"⚠️ Invalid speech segment: start={start_abs} >= end={end_abs}, skipping"
f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping"
)
continue
start_rel = start_abs - file_start_timestamp
end_rel = end_abs - file_start_timestamp
# Ensure relative timestamps are positive (sanity check)
# Ensure timestamps are positive (sanity check)
if start_rel < 0:
logger.warning(
f"⚠️ Negative start timestamp: {start_rel} (absolute: {start_abs}, file_start: {file_start_timestamp}), clamping to 0.0"
f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0"
)
start_rel = 0.0
if end_rel < 0:
logger.warning(
f"⚠️ Negative end timestamp: {end_rel} (absolute: {end_abs}, file_start: {file_start_timestamp}), skipping segment"
f"⚠️ Negative end timestamp: {end_rel}, skipping segment"
)
continue
relative_segments.append((start_rel, end_rel))
validated_segments.append((start_rel, end_rel))
logger.info(f"🕐 Converting timestamps for {audio_uuid}: file_start={file_start_timestamp}")
logger.info(f"🕐 Absolute segments: {speech_segments}")
logger.info(f"🕐 Relative segments: {relative_segments}")
logger.info(f"🕐 Processing cropping for {audio_uuid}")
logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}")
logger.info(f"🕐 Validated segments: {validated_segments}")
# Validate that we have valid relative segments after conversion
if not relative_segments:
# Validate that we have valid segments
if not validated_segments:
logger.warning(
f"No valid relative segments after timestamp conversion for {audio_uuid}"
f"No valid segments for cropping {audio_uuid}"
)
return False
success = await _crop_audio_with_ffmpeg(original_path, relative_segments, output_path)
success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path)
if success:
# Update database with cropped file info (keep original absolute timestamps for reference)
# Update database with cropped file info
cropped_filename = output_path.split("/")[-1]
if chunk_repo is not None:
await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, speech_segments)
# Validate input segments
validated_segments = []
for start_rel, end_rel in speech_segments:
# Validate input timestamps
if start_rel >= end_rel:
logger.warning(
f"⚠️ Invalid speech segment: start={start_rel} >= end={end_rel}, skipping"
)
continue
# Ensure timestamps are positive (sanity check)
if start_rel < 0:
logger.warning(
f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0"
)
start_rel = 0.0
if end_rel < 0:
logger.warning(
f"⚠️ Negative end timestamp: {end_rel}, skipping segment"
)
continue
validated_segments.append((start_rel, end_rel))
logger.info(f"🕐 Processing cropping for {audio_uuid}")
logger.info(f"🕐 Input segments (relative timestamps): {speech_segments}")
logger.info(f"🕐 Validated segments: {validated_segments}")
# Validate that we have valid segments
if not validated_segments:
logger.warning(
f"No valid segments for cropping {audio_uuid}"
)
return False
success = await _crop_audio_with_ffmpeg(original_path, validated_segments, output_path)
if success:
# Update database with cropped file info
cropped_filename = output_path.split("/")[-1]
if chunk_repo is not None:
await chunk_repo.update_cropped_audio(audio_uuid, cropped_filename, validated_segments)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/utils/audio_utils.py around lines
276 to 316, the code validates speech_segments into validated_segments and uses
validated_segments for cropping but then calls chunk_repo.update_cropped_audio
with the original (unvalidated) speech_segments; change the call to pass
validated_segments (ensure the same data shape expected by
update_cropped_audio), so the stored segments match what was actually cropped;
also keep the existing checks (only call update when chunk_repo is not None and
success is True).

Comment on lines +188 to +199
cropping_job = default_queue.enqueue(
process_cropping_job,
conversation_id,
result["file_path"],
user_id,
depends_on=speaker_job,
job_timeout=300,
result_ttl=JOB_RESULT_TTL,
job_id=f"crop_{conversation_id[:8]}",
description=f"Crop audio for {conversation_id[:8]}",
meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: enqueue args don’t match process_cropping_job signature.

process_cropping_job(conversation_id, audio_path, redis_client=None) — passing user_id as a positional arg conflicts with decorator-injected redis_client.

                 cropping_job = default_queue.enqueue(
                     process_cropping_job,
                     conversation_id,
-                    result["file_path"],
-                    user_id,
+                    result["file_path"],
                     depends_on=speaker_job,
                     job_timeout=300,
                     result_ttl=JOB_RESULT_TTL,
                     job_id=f"crop_{conversation_id[:8]}",
                     description=f"Crop audio for {conversation_id[:8]}",
                     meta={'audio_uuid': result["audio_uuid"], 'conversation_id': conversation_id}
                 )
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py around lines
188 to 199, the enqueue call passes user_id as a third positional argument which
conflicts with the decorator-injected redis_client in
process_cropping_job(conversation_id, audio_path, redis_client=None); remove the
positional user_id from the enqueue call so the arguments are (conversation_id,
result["file_path"], depends_on=..., job_timeout=..., ...), and if the job truly
needs user_id either (A) include it in the job meta (meta={'audio_uuid': ...,
'conversation_id': conversation_id, 'user_id': user_id}) or (B) change
process_cropping_job signature to accept user_id before redis_client (def
process_cropping_job(conversation_id, audio_path, user_id=None,
redis_client=None)) and update any callers accordingly so redis_client remains
the injected third parameter.

Comment on lines 702 to 712
job = default_queue.enqueue(
process_cropping_job,
client_id,
conversation_id,
audio_path,
user_id,
audio_uuid,
original_path,
speech_segments,
output_path,
job_timeout=timeout_mapping.get(priority, 180),
result_ttl=JOB_RESULT_TTL,
job_id=f"cropping_{audio_uuid[:8]}",
description=f"Crop audio for {audio_uuid[:8]}",
meta={'audio_uuid': audio_uuid}
job_id=f"crop_{conversation_id[:12]}",
description=f"Crop audio for conversation {conversation_id[:12]}",
meta={'conversation_id': conversation_id}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

enqueue_cropping passes extra arg; align with job signature.

Keep API if you want, but don’t pass user_id through to the job.

     job = default_queue.enqueue(
         process_cropping_job,
-        conversation_id,
-        audio_path,
-        user_id,
+        conversation_id,
+        audio_path,
         job_timeout=timeout_mapping.get(priority, 180),
         result_ttl=JOB_RESULT_TTL,
         job_id=f"crop_{conversation_id[:12]}",
         description=f"Crop audio for conversation {conversation_id[:12]}",
         meta={'conversation_id': conversation_id}
     )
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py around lines
702 to 712, the enqueue call is passing user_id to process_cropping_job even
though the job signature does not accept it; remove user_id from the argument
list when calling default_queue.enqueue (keep conversation_id and audio_path
only), and if you intend to preserve an API that needs user_id for metadata,
move it into the meta dict (e.g., meta={'conversation_id': conversation_id,
'user_id': user_id}) instead of as a positional argument so the job signature
remains unchanged.

Comment on lines +71 to +86
# Update speech detection job metadata with conversation_id
if speech_job_id:
try:
from rq.job import Job
from advanced_omi_backend.controllers.queue_controller import redis_conn

speech_job = Job.fetch(speech_job_id, connection=redis_conn)
if speech_job and speech_job.meta:
speech_job.meta['conversation_id'] = conversation_id
# Remove session_level flag - now linked to conversation
speech_job.meta.pop('session_level', None)
speech_job.save_meta()
logger.info(f"🔗 Updated speech job {speech_job_id[:12]} with conversation_id")
except Exception as e:
logger.warning(f"⚠️ Failed to update speech job metadata: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Narrow exception scope and keep traceback.

Catching bare Exception masks real failures. Catch specific RQ/Redis exceptions and log with traceback.

-    if speech_job_id:
-        try:
+    if speech_job_id:
+        try:
             from rq.job import Job
             from advanced_omi_backend.controllers.queue_controller import redis_conn
+            from rq.exceptions import NoSuchJobError
+            from redis.exceptions import RedisError
@@
-        except Exception as e:
-            logger.warning(f"⚠️ Failed to update speech job metadata: {e}")
+        except (NoSuchJobError, RedisError) as e:
+            logger.exception("⚠️ Failed to update speech job metadata")
🧰 Tools
🪛 Ruff (0.14.0)

84-84: Do not catch blind exception: Exception

(BLE001)

Comment on lines +285 to 299
# Enqueue post-conversation processing pipeline
from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs

job_ids = start_post_conversation_jobs(
conversation_id=conversation_id,
audio_uuid=session_id,
user_id=user_id,
user_email=user_email,
audio_file_path=file_path
)

logger.info(
f"📥 RQ: Enqueued batch processing chain: "
f"{job_ids['transcription']} → {job_ids['speaker_recognition']} → {job_ids['memory']}"
f"📥 Pipeline: transcribe({job_ids['transcription']}) → "
f"speaker({job_ids['speaker_recognition']}) → "
f"crop({job_ids['cropping']}) → "
f"memory({job_ids['memory']})"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Double-check: passing session_id as audio_uuid to post-conversation pipeline.

start_post_conversation_jobs(..., audio_uuid=session_id, ...) looks inconsistent with the shift to conversation-centric processing. If any downstream reprocessing still uses audio_uuid semantics, passing a session_id could mis-associate artifacts.

  • Prefer passing conversation_id and audio_file_path only, or pass the conversation’s actual audio_uuid if still required.
  • Please confirm start_post_conversation_jobs signature and expectations.

To verify usages:


🏁 Script executed:

#!/bin/bash
fd -t f 'queue_controller.py' | xargs -I{} sed -n '1,250p' {}
rg -nP 'start_post_conversation_jobs\s*\(' -C3
rg -nP 'audio_uuid\s*=' -g '!**/node_modules/**'

Length of output: 14867


[/skip_cloning]


---

🏁 Script executed:

```shell
#!/bin/bash
# Find start_post_conversation_jobs function definition in queue_controller.py
rg -nP 'def start_post_conversation_jobs' -A 50 backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

# Get context around conversation_jobs.py:288 to understand what session_id is
sed -n '270,310p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

# Check what start_post_conversation_jobs does with the audio_uuid parameter
rg -nP 'audio_uuid' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py | head -20

Length of output: 2967



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for start_post_conversation_jobs definition anywhere in queue_controller
rg -n 'def start_post_conversation_jobs' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

# Get the function definition with more context (search for it and show 80 lines)
rg -nP 'def start_post_conversation_jobs' -A 80 backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

# Get the calling function signature in conversation_jobs.py to see where session_id comes from
sed -n '200,310p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py | head -100

Length of output: 4587


🏁 Script executed:

#!/bin/bash
# Search ALL Python files for start_post_conversation_jobs definition
rg -n 'def start_post_conversation_jobs' backends/advanced/src/ --type py

# Search for the function in queue_controller more carefully (might be multiline)
rg -n 'start_post_conversation_jobs' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py | head -20

# Get the function that contains line 288 (to see full signature and where session_id comes from)
sed -n '150,200p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Length of output: 2498


🏁 Script executed:

#!/bin/bash
# Search for start_post_conversation_jobs across entire repository
find backends/advanced/src -name "*.py" -exec grep -l "def start_post_conversation_jobs" {} \;

# Also try searching without 'def' to see all references
rg 'start_post_conversation_jobs' backends/advanced/src --type py -n

# Get the function signature from conversation_jobs.py that calls it (find the function header)
sed -n '1,150p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py | grep -n "^async def\|^def" | tail -5

Length of output: 647


🏁 Script executed:

#!/bin/bash
# List all function definitions in queue_controller.py
rg -nP '^(async )?def ' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

# Get the full signature and first 30 lines of open_conversation_job function
sed -n '18,100p' backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Length of output: 3758


🏁 Script executed:

#!/bin/bash
# Get start_batch_processing_jobs function signature and implementation
sed -n '392,450p' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

# Check websocket_controller to see what it passes to start_post_conversation_jobs
sed -n '740,760p' backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

# Search for any TODO or FIXME comments about start_post_conversation_jobs
rg -n 'TODO|FIXME|start_post' backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Length of output: 3149


🏁 Script executed:

#!/bin/bash
# Check if start_post_conversation_jobs is mentioned in any comments or docs
rg -n 'start_post_conversation_jobs' backends/advanced/src backends/advanced/Docs backends/advanced/memory --type py --type md

# Get more context from websocket_controller around the call
sed -n '700,760p' backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

# Check if there's a recent commit that renamed or removed this function
git log --oneline --all -- '*queue_controller.py' 2>/dev/null | head -5

Length of output: 3063


Critical: start_post_conversation_jobs function does not exist in queue_controller.py—code will fail on import.

The function being imported and called at conversation_jobs.py:286 and websocket_controller.py:744 does not exist. Both call sites will raise an ImportError at runtime.

The codebase contains start_batch_processing_jobs (line 392 of queue_controller.py) with the correct job chain logic, but the calls are trying to invoke a non-existent function. Additionally, both call sites are missing required parameters (user_id, user_email) that start_batch_processing_jobs expects.

Both call sites need either:

  • Implementation of start_post_conversation_jobs as a wrapper around start_batch_processing_jobs, or
  • Refactoring to call start_batch_processing_jobs directly with all required parameters (user_id and user_email are available in scope at both locations)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
around lines 285–299, the code imports and calls a non-existent
start_post_conversation_jobs; replace that import and call with the existing
start_batch_processing_jobs from
advanced_omi_backend.controllers.queue_controller and pass the required user_id
and user_email (available in scope) along with conversation_id,
audio_uuid=session_id, and audio_file_path=file_path, then update the job_ids
reference usage accordingly; alternatively, if you prefer a thin compatibility
layer, add a start_post_conversation_jobs wrapper in queue_controller.py that
forwards all required parameters to start_batch_processing_jobs and update
imports to use that wrapper.

Comment on lines 22 to 24
async def process_memory_job(
client_id: str,
user_id: str,
user_email: str,
conversation_id: str,
redis_client=None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Function signature mismatch with enqueue function.

The function signature only accepts conversation_id and redis_client, but the enqueue_memory_processing function (lines 209-214) still passes client_id, user_id, and user_email as separate arguments. This will cause runtime errors when the job is executed.

Update the enqueue function to match:

 def enqueue_memory_processing(
-    client_id: str,
-    user_id: str,
-    user_email: str,
     conversation_id: str,
     priority: JobPriority = JobPriority.NORMAL
 ):
     """
     Enqueue a memory processing job.

     Returns RQ Job object for tracking.
     """
     timeout_mapping = {
         JobPriority.URGENT: 3600,  # 60 minutes
         JobPriority.HIGH: 2400,    # 40 minutes
         JobPriority.NORMAL: 1800,  # 30 minutes
         JobPriority.LOW: 900       # 15 minutes
     }

     job = memory_queue.enqueue(
         process_memory_job,
-        client_id,
-        user_id,
-        user_email,
         conversation_id,
         job_timeout=timeout_mapping.get(priority, 1800),
         result_ttl=JOB_RESULT_TTL,
         job_id=f"memory_{conversation_id[:8]}",
         description=f"Process memory for conversation {conversation_id[:8]}"
     )

     logger.info(f"📥 RQ: Enqueued memory job {job.id} for conversation {conversation_id}")
     return job

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.0)

24-24: Unused function argument: redis_client

(ARG001)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py around
lines 22 to 24, the async function process_memory_job currently only accepts
(conversation_id: str, redis_client=None) but enqueue_memory_processing passes
client_id, user_id, user_email (plus conversation_id/redis_client); update the
function signature to accept the same parameters the enqueue call supplies
(e.g., async def process_memory_job(client_id: str, user_id: str, user_email:
str, conversation_id: str, redis_client=None):) and then adjust the function
body to use the new parameters where needed so the runtime argument order
matches the enqueue invocation. Ensure redis_client remains optional with a
default of None.

@@ -0,0 +1,54 @@
#!/usr/bin/env python3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the script executable.

The shebang is present but the file is not executable. If this script is intended to be run directly, add execute permissions.

Apply this fix:

chmod +x backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py
🧰 Tools
🪛 Ruff (0.14.0)

1-1: Shebang is present but file is not executable

(EXE001)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py around
line 1 the shebang is present but the file lacks execute permissions; make the
script executable by updating its file mode (e.g., run chmod +x on that file in
the repository or set equivalent executable bit in your commit) so it can be
invoked directly.

Comment on lines +104 to 109
// If the string doesn't include timezone info, append 'Z' to treat as UTC
const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-')
? timestamp
: timestamp + 'Z'
return new Date(isoString).toLocaleString()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix timestamp normalization to avoid invalid dates (space vs 'T', missing TZ).

Current logic can yield invalid strings like "YYYY-MM-DD HH:mm:ssZ" and may mis-detect timezones. Normalize space to 'T' first and append 'Z' only when no TZ is present.

Apply:

-      // If the string doesn't include timezone info, append 'Z' to treat as UTC
-      const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-')
-        ? timestamp
-        : timestamp + 'Z'
-      return new Date(isoString).toLocaleString()
+      // Normalize "YYYY-MM-DD HH:mm:ss" -> "YYYY-MM-DDTHH:mm:ss" and add 'Z' if no timezone
+      const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T')
+      const isoString = /(?:Z|[+-]\d{2}:\d{2})$/.test(normalized) ? normalized : `${normalized}Z`
+      return new Date(isoString).toLocaleString()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If the string doesn't include timezone info, append 'Z' to treat as UTC
const isoString = timestamp.endsWith('Z') || timestamp.includes('+') || timestamp.includes('T') && timestamp.split('T')[1].includes('-')
? timestamp
: timestamp + 'Z'
return new Date(isoString).toLocaleString()
}
// Normalize "YYYY-MM-DD HH:mm:ss" -> "YYYY-MM-DDTHH:mm:ss" and add 'Z' if no timezone
const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T')
const isoString = /(?:Z|[+-]\d{2}:\d{2})$/.test(normalized) ? normalized : `${normalized}Z`
return new Date(isoString).toLocaleString()

…lite into close-convo

# Conflicts:
#	backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
#	backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
#	backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
#	backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
#	backends/advanced/webui/src/pages/Queue.tsx
@thestumonkey thestumonkey closed this by deleting the head repository Oct 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants