Skip to content

feat: Adds "closing conversation", beefs up and fixes Queue management page #144

Merged
AnkushMalaker merged 15 commits intoSimpleOpenSoftware:mainfrom
Ushadow-io:close-convo2
Nov 3, 2025
Merged

feat: Adds "closing conversation", beefs up and fixes Queue management page #144
AnkushMalaker merged 15 commits intoSimpleOpenSoftware:mainfrom
Ushadow-io:close-convo2

Conversation

@thestumonkey
Copy link
Contributor

@thestumonkey thestumonkey commented Oct 27, 2025

Please let this be the one

Summary by CodeRabbit

  • New Features

    • Added audio cropping and re-processing capabilities for recorded audio
    • Introduced session-level conversation tracking and status monitoring
    • Added consolidated dashboard for job and queue monitoring across audio, transcription, and memory processing
  • Improvements

    • Enhanced conversation lifecycle management with automatic inactivity timeout handling
    • Improved audio session persistence with per-conversation file rotation
    • Strengthened WebSocket authentication logging with detailed error context
  • Infrastructure

    • Refactored worker orchestration with dedicated audio processing queue
    • Added session management controller for Redis-backed streaming status

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 27, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This pull request restructures the backend's conversation and audio processing architecture by removing the ConversationManager singleton, refactoring job queuing into modular post-conversation pipelines, introducing Redis-backed session management, consolidating audio operations into dedicated controllers, and adding infrastructure changes for worker deployment and session cleanup orchestration.

Changes

Cohort / File(s) Summary
Conversation Lifecycle & State Management
backends/advanced/src/advanced_omi_backend/client.py, backends/advanced/src/advanced_omi_backend/conversation_manager.py
Removed ConversationManager class and get_conversation_manager function; simplified close_current_conversation to perform direct cleanup without delegating to a manager, now logs closure and cleanup operations unconditionally.
Session Management (New Module)
backends/advanced/src/advanced_omi_backend/controllers/session_controller.py
New module introducing Redis-backed session info retrieval, bulk session listing, conversation count tracking, streaming status aggregation, and cleanup of old/stuck sessions with comprehensive error handling.
Audio Controller Expansion
backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py
Added get_cropped_audio_info and reprocess_audio_cropping endpoints; modified upload flow to use post-conversation job pipeline and auto-generated conversation_id, replacing batch processing path.
Conversation Controller Refactoring
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
Removed legacy endpoints (get_conversation_by_id, get_cropped_audio_info, reprocess_audio_cropping, add_speaker_to_conversation, update_transcript_segment); updated reprocess_transcript to include audio cropping in job chain; removed audio_path/cropped_audio_path from response fields.
Queue/Job Orchestration
backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py
Added AUDIO_QUEUE constant and queue instance; replaced start_batch_processing_jobs with start_post_conversation_jobs supporting cropping, transcription, speaker recognition, memory, and title/summary jobs; updated job completion/stats logic; added cleanup_stuck_stream_workers function.
System Controller Endpoint Migration
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
Removed three endpoints (get_streaming_status, cleanup_stuck_stream_workers, cleanup_old_sessions) delegated to new session_controller and queue_controller.
WebSocket & Batch Processing
backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py
Enhanced cleanup_client_state with Redis stream/session cleanup; updated _initialize_streaming_session to remove user_email parameter; refactored _process_batch_audio_complete to use post_conversation_jobs and auto-generated conversation_id.
Worker Job Infrastructure
backends/advanced/src/advanced_omi_backend/workers/__init__.py
Added _ensure_beanie_initialized to public exports; added check_enrolled_speakers_job and recognise_speakers_job; removed process_audio_job and enqueue_audio_processing.
Audio Jobs Restructuring
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
Refactored process_cropping_job (from process_audio_job) for conversation-driven cropping; redesigned audio_streaming_persistence_job with per-conversation file rotation and Redis coordination; simplified enqueue_cropping API.
Conversation Jobs with Lifecycle
backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
Updated open_conversation_job to remove user_email, add speech_job_id parameter, auto-generate conversation_id, implement inactivity timeout logic, and enqueue post_conversation pipeline; added new generate_title_summary_job.
Memory Jobs Simplification
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py
Updated process_memory_job signature to derive user/client info from conversation model instead of accepting parameters; added memory metadata collection for UI; added primary_speakers filtering.
Transcription Jobs & Speaker Recognition
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py
Removed user_id from transcribe_full_audio_job; replaced recognise_speakers_job and stream_speech_detection_job with new stream_speech_detection_job (session_id/client_id based); updated conversation orchestration logic.
Speaker Recognition Jobs (New Module)
backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py
New module defining check_enrolled_speakers_job and recognise_speakers_job for speaker identification and enrollment status checks with metadata storage.
Worker Entry Point (New Module)
backends/advanced/src/advanced_omi_backend/workers/rq_worker_entry.py
New entry-point script for RQ workers with upfront logging configuration, queue selection via CLI args, and worker startup with logging support.
Worker Startup Scripts
backends/advanced/start-k8s.sh, backends/advanced/start-workers.sh
Updated to use new rq_worker_entry module instead of direct rq worker invocation; added dedicated audio persistence worker in K8s; expanded to 6 parallel RQ workers in start-workers.sh.
Database & Model Changes
backends/advanced/src/advanced_omi_backend/database.py, backends/advanced/src/advanced_omi_backend/models/conversation.py, backends/advanced/src/advanced_omi_backend/models/job.py
AudioChunksRepository: removed speakers_identified parameter and add_speaker method. Conversation: added UUID-based conversation_id default and ConversationStatus enum; MemoryVersion: added audio_path/cropped_audio_path fields; create_conversation: made conversation_id optional with auto-generation. Job: added shared Beanie initialization guard.
LLM Client Simplification
backends/advanced/src/advanced_omi_backend/llm_client.py
Removed provider parameter from OpenAILLMClient.init; changed health_check from async to sync; unified openai/ollama handling without separate branch.
Authentication & Utilities
backends/advanced/src/advanced_omi_backend/auth.py, backends/advanced/src/advanced_omi_backend/utils/audio_utils.py, backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py
Enhanced token auth logging with token preview and error details; refactored audio cropping to assume relative timestamps and validate segments; added is_meaningful_speech helper.
Service & Import Updates
backends/advanced/src/advanced_omi_backend/services/audio_service.py, backends/advanced/src/advanced_omi_backend/services/audio_stream/*, backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py
Removed AudioStreamMessage dataclass; lowered log level for routine operations from info to debug; updated exception handling for asyncio.TimeoutError; moved audio_utils imports to utils.audio_utils path.
Route/API Layer
backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py, backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py, backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py, backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py
Migrated cropping endpoints to audio_controller; removed add_speaker and update_transcript endpoints; added cancel_job and get_dashboard_data endpoints; removed embedder fields from audioai health; re-routed system endpoints to new controllers.
Frontend Components
backends/advanced/webui/src/pages/Conversations.tsx, backends/advanced/webui/src/pages/System.tsx, backends/advanced/webui/src/services/api.ts
Updated Conversations to use conversation_id instead of audio_uuid for expansion; removed embedder fields from System health display; consolidated queue endpoints into getDashboard, removed per-session/stats endpoints.
Kubernetes & Helm
backends/charts/advanced-backend/templates/deployment.yaml, backends/charts/advanced-backend/templates/workers-deployment.yaml, backends/charts/advanced-backend/values.yaml
Added optional workers sidecar to main deployment; introduced new workers-deployment template with configurable replicas, DNS/IPv6 setup, env/secret injection, and volume mounts; added workers configuration block to values.yaml.
External Services
Makefile, backends/advanced/src/advanced_omi_backend/memory/providers/mcp_client.py, extras/speaker-recognition/docker-compose.yml, extras/speaker-recognition/pyproject.toml, extras/speaker-recognition/src/simple_speaker_recognition/core/audio_backend.py
Updated Kubernetes config generation to skip Docker setup; changed MCP endpoint from /api/v1/memories/ to /mcp/{client_name}/sse/{user_id}; removed GPU speaker-service; pinned pyannote.audio to 3.1.1 with compatible torch/torchaudio/numpy versions; updated PyAnnote auth token parameter name.

Sequence Diagram(s)

sequenceDiagram
    participant WebSocket as WebSocket Client
    participant WS_Ctrl as websocket_controller
    participant Conv_Ctrl as conversation_controller
    participant Queue_Ctrl as queue_controller
    participant Audio_Jobs as audio_jobs
    participant Trans_Jobs as transcription_jobs
    participant Speaker_Jobs as speaker_jobs
    participant Memory_Jobs as memory_jobs

    Note over WebSocket,Memory_Jobs: Old Flow (Removed)
    WebSocket->>WS_Ctrl: close_current_conversation()
    WS_Ctrl->>Conv_Mgr: get_conversation_manager().close_conversation()
    Conv_Mgr->>Queue_Ctrl: enqueue transcription job directly

    Note over WebSocket,Memory_Jobs: New Flow
    WebSocket->>WS_Ctrl: Connection closes (cleanup_client_state)
    WS_Ctrl->>Client: cleanup Redis sessions/streams
    
    WebSocket->>WS_Ctrl: Audio chunks received (streaming)
    WS_Ctrl->>Conv_Ctrl: create/update conversation
    
    Conv_Ctrl->>Client: close_current_conversation (no manager)
    
    WebSocket->>WS_Ctrl: Session ends
    WS_Ctrl->>Queue_Ctrl: start_post_conversation_jobs()
    
    Queue_Ctrl->>Audio_Jobs: enqueue cropping job (audio queue)
    Queue_Ctrl->>Trans_Jobs: enqueue transcription job (depends on cropping)
    Queue_Ctrl->>Speaker_Jobs: enqueue speaker recognition (depends on transcription)
    Queue_Ctrl->>Memory_Jobs: enqueue memory extraction (depends on speaker)
    Queue_Ctrl->>Trans_Jobs: enqueue title/summary (depends on speaker, parallel with memory)
    
    Audio_Jobs-->>Trans_Jobs: returns cropped audio path
    Trans_Jobs-->>Speaker_Jobs: returns transcript
    Speaker_Jobs-->>Memory_Jobs: returns speaker metadata
    Memory_Jobs-->>WebSocket: final results
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Multi-layer architectural changes: Removal of ConversationManager, introduction of post-conversation job pipeline, and migration of session management require careful verification of backward compatibility and correct job chaining across multiple worker files.
  • Signature changes across worker jobs: transcribe_full_audio_job, process_memory_job, open_conversation_job, and new stream_speech_detection_job have parameter and behavior changes that propagate through multiple call sites; ensure all enqueue points pass correct arguments.
  • New session management layer: session_controller introduces Redis-based session tracking with complex cleanup logic; verify Redis key naming conventions, TTL handling, and stream cleanup logic are correct.
  • Queue infrastructure changes: New AUDIO_QUEUE, post_conversation_jobs pipeline structure, and dependency chains between jobs must be verified for correctness and for potential race conditions or stuck jobs.
  • Database/model changes: UUID-based conversation_id auto-generation, removal of speakers_identified tracking, and MemoryVersion field additions need careful verification for migration safety and impact on existing data queries.
  • Frontend/API consolidation: Dashboard endpoint and endpoint re-routing to new controllers should be validated for complete functionality coverage and permission checks.
  • Kubernetes/Helm infrastructure: New workers deployment template, sidecar configuration, and updated start scripts require validation for correct resource allocation, environment variable passing, and worker process lifecycle.

Areas requiring extra attention:

  • Job chaining logic in start_post_conversation_jobs and dependency metadata propagation
  • Inactivity timeout implementation in open_conversation_job and timeout triggering edge cases
  • Redis session/stream cleanup operations (XPENDING, XCLAIM, consumer group handling)
  • conversation_id auto-generation and retrieval after database insert
  • Speaker recognition job failure modes and fallback behavior
  • WebSocket disconnect cleanup and permission scoping in dashboard endpoint

Possibly related PRs

  • PR Queue combo #121: Both modify Beanie model usage, RQ worker/queue orchestration, and conversation/audio job processing pipelines with overlapping structural changes to job dependency chains and conversation lifecycle.
  • PR Reprocessing WIP #96: Both refactor conversation reprocessing endpoints, versioning surface, and controller/router logic for transcript handling.
  • PR Better audio processing #111: Both modify audio_utils functions (process_audio_chunk, _process_audio_cropping_with_relative_timestamps) and audio cropping signature changes.

Suggested reviewers

  • thestumonkey

🐰 Hop along this path of refactored dreams,
Where ConversationManager no longer beams;
Post-conversation pipelines now chain jobs with care, 🔗
Sessions persist in Redis, floating in air!
A modular dance of workers so neat—
Architecture evolved, now architecturally sweet!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 90.53% which is sufficient. The required threshold is 80.00%.
Title Check ✅ Passed The pull request title "feat: Adds "closing conversation", beefs up and fixes Queue management page" clearly references real and significant aspects of the changeset. The PR does introduce conversation closure enhancements (removing ConversationManager, updating client.py, adding logging) and substantially improves queue management (new audio queue, dashboard endpoints, job orchestration). The title is specific and avoids vague terminology, making it clear what aspects of the codebase are being modified. While this is a large, architecture-wide refactoring that also includes session management, audio cropping, and other interconnected changes beyond the title's scope, the title adequately captures two major focal points that align with the primary modifications in the changeset.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@thestumonkey
Copy link
Contributor Author

@coderabbitai

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 27, 2025

Tip

For best results, initiate chat on the files or code changes.

@thestumonkey, Hello! How can I assist you with this pull request? I can help with:

  • Reviewing changes - I can analyze the code changes in this PR
  • Answering questions - About the code, best practices, or implementation details
  • Generating code - Tests, documentation, or implementation suggestions
  • Analyzing the codebase - Finding patterns, usage examples, or related code
  • Configuring CodeRabbit - Adjusting review settings or behaviors

Just let me know what you'd like me to do! 🐰

@thestumonkey
Copy link
Contributor Author

There is a lot of cleaning that could happen on front end, but need to get this damn merged

…-lite into close-convo2

# Conflicts:
#	backends/advanced/src/advanced_omi_backend/memory/providers/mcp_client.py
@AnkushMalaker
Copy link
Collaborator

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 28, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (15)
backends/advanced/webui/src/pages/System.tsx (1)

269-278: Remove dead code or update HealthData interface.

The rendering logic attempts to access status and provider properties that are not defined in the HealthData interface (lines 7-14). According to the PR's AI summary, these fields were removed from the health payload. The type assertions (status as any) bypass TypeScript's type safety and likely render nothing if the backend no longer sends these properties.

Apply this diff to remove the dead code:

                    {status.message && (
                      <span className="text-sm text-gray-600 dark:text-gray-400 block">
                        {status.message}
                      </span>
                    )}
-                    {(status as any).status && (
-                      <span className="text-xs text-gray-500 dark:text-gray-500">
-                        {(status as any).status}
-                      </span>
-                    )}
-                    {(status as any).provider && (
-                      <span className="text-xs text-blue-600 dark:text-blue-400">
-                        ({(status as any).provider})
-                      </span>
-                    )}
                    {service === 'redis' && (status as any).worker_count !== undefined && (

If these properties should still be displayed, update the HealthData interface to include them as optional fields instead of using type assertions.

extras/speaker-recognition/pyproject.toml (1)

44-47: Inconsistent version pinning between CPU and CUDA variants.

The CPU variant (lines 44-47) does not specify upper bounds for torch and torchaudio, while all CUDA variants (cu121, cu126, cu128) pin to <2.3.0. This inconsistency could allow CPU installations to pull incompatible versions later when torch ≥2.3.0 is released.

For consistency and predictability, apply the same version constraints to the CPU variant:

 cpu = [
-    "torch>=2.0.0",
-    "torchaudio>=2.0.0",
+    "torch>=2.0.0,<2.3.0",
+    "torchaudio>=2.0.0,<2.3.0",
 ]
extras/speaker-recognition/src/simple_speaker_recognition/core/audio_backend.py (1)

53-54: Line length exceeds 100-character limit.

These method signatures exceed the 100-character line length specified in the coding guidelines. Black formatting should be applied to wrap these lines appropriately.

As per coding guidelines.

Also applies to: 109-110, 123-123

backends/advanced/src/advanced_omi_backend/database.py (1)

517-548: Duplicate method name overrides the versioned implementation above. Remove or rename.

The second async def update_transcription_status shadows the earlier one, breaking versioned transcript updates at runtime.

Apply this removal:

@@
-    async def update_transcription_status(
-        self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
-    ):
-        """Update transcription status and completion timestamp.
-        
-        Args:
-            audio_uuid: UUID of the audio chunk
-            status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY')
-            error_message: Optional error message if status is 'FAILED'
-            provider: Optional provider name for successful transcriptions
-        """
-        update_doc = {
-            "transcription_status": status,
-            "updated_at": datetime.now(UTC).isoformat()
-        }
-        
-        if status == "COMPLETED":
-            update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
-            if provider:
-                update_doc["transcription_provider"] = provider
-        elif status == "FAILED" and error_message:
-            update_doc["transcription_error"] = error_message
-        elif status == "EMPTY":
-            update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
-            if provider:
-                update_doc["transcription_provider"] = provider
-            
-        result = await self.col.update_one(
-            {"audio_uuid": audio_uuid}, {"$set": update_doc}
-        )
-        return result.modified_count > 0

Follow‑up: ensure callers use the versioned method semantics.

Also standardize timestamps (strings vs datetime objects) across methods (e.g., cropped_at).

backends/advanced/src/advanced_omi_backend/llm_client.py (2)

60-71: Import error message references the wrong package.

You import langfuse.openai; the error suggests installing openai. Adjust message.

Apply:

-        except ImportError:
-            self.logger.error("OpenAI library not installed. Install with: pip install openai")
+        except ImportError:
+            self.logger.error("OpenAI-compatible client not installed. Install with: pip install langfuse")
             raise

46-72: Relax API key requirement to support local Ollama deployments.

The __init__ currently raises ValueError if OPENAI_API_KEY is missing, blocking Ollama (which doesn't use an API key). The OpenAI client library accepts api_key=None and only errors on actual requests if no key exists—so validation here is premature.

Change the validation from:

if not self.api_key or not self.base_url or not self.model:
    raise ValueError("OPENAI_API_KEY, OPENAI_BASE_URL, and OPENAI_MODEL must be set")

To:

if not self.base_url or not self.model:
    raise ValueError("OPENAI_BASE_URL and OPENAI_MODEL must be set")

Then pass api_key as-is to openai.OpenAI(api_key=self.api_key, base_url=self.base_url) (whether None or a value). This allows both OpenAI and Ollama configurations to work, since the factory method (line 136–145) instantiates OpenAILLMClient for both providers.

Makefile (1)

188-202: Now let me check the actual codebase for generate-k8s-configs.py and verify target dependencies:

Based on the verification, the issue is confirmed. The script output shows:

  • Lines 159, 241, and 254 in the Makefile explicitly source skaffold.env before running skaffold commands
  • The config-k8s target does not generate or ensure skaffold.env exists
  • This creates a silent failure risk: deploy commands could fail to source required environment variables

The review comment's core concern is valid. The suggestions to fail fast and handle metadata are appropriate defensive improvements.

config-k8s no longer guarantees skaffold.env; deploy targets still source it.

Lines 159, 241, and 254 in the Makefile source skaffold.env before running skaffold, but config-k8s (which calls generate-k8s-configs.py) doesn't ensure it exists. This risks silent deployment failures with incorrect or missing environment variables. Add a fail-fast check:

 config-k8s: ## Generate Kubernetes configuration files (ConfigMap/Secret only - no .env files)
 	@echo "☸️  Generating Kubernetes configuration files..."
 	@python3 scripts/generate-k8s-configs.py
+	@if [ ! -f skaffold.env ]; then \
+		echo "❌ skaffold.env not found. Ensure generate-k8s-configs.py or setup generates it."; \
+		exit 1; \
+	fi
 	@echo "📦 Applying ConfigMap and Secret to Kubernetes..."

Consider also stripping managedFields when copying resources across namespaces to prevent apply errors:

-	@kubectl get configmap friend-lite-config -n $(APPLICATION_NAMESPACE) -o yaml | \
-		sed -e '/namespace:/d' -e '/resourceVersion:/d' -e '/uid:/d' -e '/creationTimestamp:/d' | \
+	@kubectl get configmap friend-lite-config -n $(APPLICATION_NAMESPACE) -o yaml | \
+		sed -e '/namespace:/d' -e '/resourceVersion:/d' -e '/uid:/d' -e '/creationTimestamp:/d' -e '/managedFields:/,/^ *[^- ]/d' | \
 		kubectl apply -n speech -f - 2>/dev/null || echo "⚠️  ConfigMap not copied to speech namespace"
backends/advanced/start-k8s.sh (1)

22-26: Fix misleading "matching start-workers.sh" comments for RQ worker count.

The review comment correctly identified worker consistency issues. Verification confirms:

  • start-k8s.sh workers (5 total): ✅ Count is accurate—1 audio stream + 3 RQ + 1 audio persistence
  • Cleanup code: ✅ All worker PIDs properly tracked and terminated across both shutdown paths (lines 16–27 and 203–208)
  • Comments at lines 130 and 162: ❌ Claim "matching start-workers.sh" but counts differ:
    • start-k8s.sh: 3 RQ workers
    • start-workers.sh: 6 RQ workers

Update lines 130 and 162 to accurately reflect the intentional configuration difference instead of claiming consistency.

backends/advanced/src/advanced_omi_backend/utils/audio_utils.py (1)

262-324: Critical format mismatch in audio_controller.py caller — speech_segments stored as dicts, not tuples

The audio_jobs.py caller correctly converts segments to tuples, but audio_controller.py retrieves speech_segments from the chunk document as a list of dicts and passes them directly to the function, which expects list[tuple[float, float]].

In database.py lines 310-312, segments are stored as dicts: {"start": start, "end": end}. When the function tries to unpack these at line 278 (for start_rel, end_rel in speech_segments), dict unpacking yields keys only ("start", "end"), causing incorrect behavior or runtime errors.

Fix required at backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py line 274:
Convert the dict format to tuples before passing:

speech_segments = chunk.get("speech_segments", [])
# Convert from [{"start": x, "end": y}, ...] to [(x, y), ...]
speech_segments = [(seg["start"], seg["end"]) for seg in speech_segments]
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (3)

46-49: Return proper HTTP errors instead of success-with-error payloads

Endpoints like list_jobs return 200 with an {"error": "..."} body. Prefer raising HTTPException(500, ...) to avoid silent failures and align with guidelines.

Apply this minimal change:

-    except Exception as e:
-        logger.error(f"Failed to list jobs: {e}")
-        return {"error": "Failed to list jobs", "jobs": [], "pagination": {"total": 0, "limit": limit, "offset": offset, "has_more": False}}
+    except Exception as e:
+        logger.exception("Failed to list jobs")
+        raise HTTPException(status_code=500, detail="Failed to list jobs") from e

Repeat similar handling for get_queue_stats_endpoint, get_queue_health_endpoint, get_stream_stats. As per coding guidelines.


79-92: Redact sensitive data in job responses

Returning args, kwargs, meta, and result can leak PII (e.g., user_email, tokens). Redact known sensitive keys or restrict full payloads to admins.

-            "args": job.args,
-            "kwargs": job.kwargs,
-            "meta": job.meta if job.meta else {},
-            "result": job.result,
+            "args": job.args if current_user.is_superuser else [],
+            "kwargs": _redact_job_mapping(job.kwargs or {}),
+            "meta": _redact_job_mapping(job.meta or {}),
+            "result": job.result if current_user.is_superuser else None,

Add helper near top:

SENSITIVE_KEYS = {"user_email", "access_token", "api_key", "authorization", "password", "token"}

def _redact_job_mapping(d: dict) -> dict:
    if not d:
        return {}
    redacted = {}
    for k, v in d.items():
        redacted[k] = "***" if str(k).lower() in SENSITIVE_KEYS else v
    return redacted

193-207: Repeat of PII risk in session job listings

Same concern as get_job: args/kwargs/meta may contain PII. Apply the same redaction and admin‑only detail policy here.

-                "result": job.result,
-                "meta": job.meta if job.meta else {},
-                "args": job.args,
-                "kwargs": job.kwargs if job.kwargs else {},
+                "result": job.result if current_user.is_superuser else None,
+                "meta": _redact_job_mapping(job.meta or {}),
+                "args": job.args if current_user.is_superuser else [],
+                "kwargs": _redact_job_mapping(job.kwargs or {}),
backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)

190-221: Blocker: enqueue passes wrong arguments to process_memory_job

process_memory_job now takes (conversation_id, redis_client). enqueue_memory_processing still enqueues (client_id, user_id, user_email, conversation_id), causing a runtime error when the worker runs.

Apply:

-def enqueue_memory_processing(
-    client_id: str,
-    user_id: str,
-    user_email: str,
-    conversation_id: str,
-    priority: JobPriority = JobPriority.NORMAL
-):
+def enqueue_memory_processing(
+    conversation_id: str,
+    priority: JobPriority = JobPriority.NORMAL,
+):
@@
-    job = memory_queue.enqueue(
-        process_memory_job,
-        client_id,
-        user_id,
-        user_email,
-        conversation_id,
+    job = memory_queue.enqueue(
+        process_memory_job,
+        conversation_id,
         job_timeout=timeout_mapping.get(priority, 1800),
         result_ttl=JOB_RESULT_TTL,
-        job_id=f"memory_{conversation_id[:8]}",
+        job_id=f"memory_{conversation_id[:8]}_{int(time.time())}",
         description=f"Process memory for conversation {conversation_id[:8]}"
     )

Follow-up: update all call sites (see controller diff below).

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2)

69-69: Likely wrong attribute: use user.user_id

Elsewhere you use user.user_id. user.id may be absent or different. Use user.user_id for consistency.

-        logger.info(f"Manually closed conversation for client {client_id} by user {user.id}")
+        logger.info(f"Manually closed conversation for client {client_id} by user {user.user_id}")

475-481: Update reprocess_memory to new enqueue API

After changing enqueue_memory_processing, update this call.

-        job = enqueue_memory_processing(
-            client_id=conversation_model.client_id,
-            user_id=str(user.user_id),
-            user_email=user.email,
-            conversation_id=conversation_id,
-            priority=JobPriority.NORMAL
-        )
+        job = enqueue_memory_processing(
+            conversation_id=conversation_id,
+            priority=JobPriority.NORMAL
+        )
🧹 Nitpick comments (50)
backends/advanced/webui/src/pages/System.tsx (1)

279-284: Consider type-safe service-specific metadata.

The Redis worker information uses type assertions to access properties not defined in the HealthData interface. While functional, this bypasses type safety.

Consider extending the interface to support service-specific metadata:

 interface HealthData {
   status: 'healthy' | 'partial' | 'unhealthy'
   services: Record<string, {
     healthy: boolean
     message?: string
+    // Service-specific optional metadata
+    worker_count?: number
+    active_workers?: number
+    idle_workers?: number
   }>
   timestamp?: string
 }

Or create a discriminated union type for different service types if more services need custom fields.

backends/advanced/src/advanced_omi_backend/routers/modules/health_routes.py (1)

195-195: Consider more robust health check logic.

The health check now relies on string matching for "✅" in the LLM status, which is fragile and unconventional. If the status message format changes or the checkmark emoji is omitted, the health check will incorrectly report unhealthy status.

Consider checking for explicit status fields or error indicators instead:

-            "healthy": "✅" in llm_health.get("status", ""),
+            "healthy": llm_health.get("healthy", False) or llm_health.get("error") is None,

Alternatively, if the LLM health response has a dedicated boolean field, use that directly.

backends/advanced/src/advanced_omi_backend/utils/audio_utils.py (1)

276-303: Good defensive validation but consider surfacing upstream issues.

The input validation improvements are solid:

  • Explicit checks for invalid segment ordering (start_rel >= end_rel)
  • Negative timestamp detection and handling
  • Clear logging of validation failures

However, clamping negative start_rel to 0.0 (lines 287-291) may hide bugs in upstream timestamp generation. Consider whether this should be an error instead.

If negative timestamps indicate a bug rather than an expected edge case:

             if start_rel < 0:
                 logger.warning(
-                    f"⚠️ Negative start timestamp: {start_rel}, clamping to 0.0"
+                    f"⚠️ Negative start timestamp: {start_rel}, skipping segment (indicates upstream bug)"
                 )
-                start_rel = 0.0
+                continue
backends/advanced/start-workers.sh (2)

52-66: Use uv with python3 consistently.

Per repo guidelines for shell scripts, prefer uv run python3 ... over uv run python .... Apply to all invocations here.

As per coding guidelines

-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default &
@@
-uv run python -m advanced_omi_backend.workers.rq_worker_entry audio &
+uv run python3 -m advanced_omi_backend.workers.rq_worker_entry audio &
@@
-uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_worker &
+uv run python3 -m advanced_omi_backend.workers.audio_stream_deepgram_worker &

Also applies to: 68-72, 76-77


83-87: Minor DRY improvement for worker PID management.

The echo/kill blocks duplicate per-worker lines. Consider a small array + loop to reduce error-prone repetition (optional).

Example (outside changed ranges, illustrative only):

# Track PIDs
RQ_PIDS=("$RQ_WORKER_1_PID" "$RQ_WORKER_2_PID" "$RQ_WORKER_3_PID" "$RQ_WORKER_4_PID" "$RQ_WORKER_5_PID" "$RQ_WORKER_6_PID")

# Echo
for i in "${!RQ_PIDS[@]}"; do
  echo "  - RQ worker $((i+1)): PID ${RQ_PIDS[$i]} (transcription, memory, default)"
done

# Kill
for pid in "${RQ_PIDS[@]}" "$AUDIO_PERSISTENCE_WORKER_PID" "$AUDIO_STREAM_WORKER_PID"; do
  kill "$pid" 2>/dev/null || true
done

Also applies to: 97-101

backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py (1)

52-58: Endpoints now in audio_controller: add response models and plan ID alignment.

  • Add response_model=... for typed FastAPI responses to prevent schema drift.
  • Long term, consider converging these audio operations to conversation-centric IDs for consistency with the rest of the router.

As per coding guidelines

Also applies to: 61-67

backends/advanced/src/advanced_omi_backend/workers/__init__.py (2)

61-63: Avoid exporting private underscored symbols.

_ensure_beanie_initialized appears internal; consider not exposing it in __all__ or rename without underscore if intended public API.


69-85: Group audio_streaming_persistence_job under Audio jobs in all.

It’s currently under the Conversation section, which can confuse consumers.

 __all__ = [
@@
-    # Conversation jobs
-    "open_conversation_job",
-    "audio_streaming_persistence_job",
+    # Conversation jobs
+    "open_conversation_job",
@@
     # Audio jobs
     "process_cropping_job",
+    "audio_streaming_persistence_job",
     "enqueue_cropping",
backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py (2)

105-121: Remove unused version_id.

Eliminate dead code flagged by static analysis.

-                version_id = str(uuid.uuid4())

18-23: Avoid relying on private helper _process_audio_cropping_with_relative_timestamps.

Importing underscored functions couples you to internals. Expose a public wrapper in audio_utils and depend on that instead.

As per coding guidelines

Also applies to: 275-287

backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (6)

317-420: Stream stats: move imports to top and prefer decode_responses=True

  • Follow guideline: imports at top; avoid repeated in‑function imports.
  • Using decode_responses=True on the Redis client avoids byte/str decoding branches.
-        from advanced_omi_backend.services.audio_service import get_audio_stream_service
-        audio_service = get_audio_stream_service()
+        from advanced_omi_backend.services.audio_service import get_audio_stream_service
+        audio_service = get_audio_stream_service()
@@
-                stream_name = stream_key.decode() if isinstance(stream_key, bytes) else stream_key
+                stream_name = stream_key  # with decode_responses=True

Initialize Redis with decode_responses=True in the audio service if feasible.


456-458: flush_jobs: DRY the queue names

Same duplication of queue list. Import a single QUEUE_NAMES from queue_controller to keep consistency.


530-582: flush-all: cancellation semantics and logging

Great robustness. Two tweaks:

  • Cancel also deferred/scheduled to ensure workers don't pick them later.
  • Use logger.exception in the outer except and when individual deletions fail.
-                        if job.is_started:
+                        if job.is_started or job.is_deferred or job.is_scheduled:

603-648: Redis sessions: move imports to top and use decode_responses=True

Avoid mid-function imports; initialize aioredis with decode_responses=True to drop manual decoding and simplify logic.

-        redis_client = aioredis.from_url(REDIS_URL)
+        redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)
@@
-                    session_id = key.decode().replace("audio:session:", "")
+                    session_id = key.replace("audio:session:", "")
@@
-                        "user_id": session_data.get(b"user_id", b"").decode(),
+                        "user_id": session_data.get("user_id", ""),

705-706: Exception chaining and logging

Use logger.exception and chain the HTTPException to preserve the original cause.

-        logger.error(f"Failed to clear sessions: {e}", exc_info=True)
-        raise HTTPException(status_code=500, detail=f"Failed to clear sessions: {str(e)}")
+        logger.exception("Failed to clear sessions")
+        raise HTTPException(status_code=500, detail="Failed to clear sessions") from e

708-967: get_dashboard_data: correctness, privacy, and maintainability nits

  • timestamp uses event loop time; use time.time() or datetime.now().isoformat().
  • PII: redact kwargs/meta; omit args/result for non-admins.
  • Remove unused conversation_ids and unused status_name; tighten lints.
  • Move imports to top; consider extracting nested functions to a small service for reuse with get_jobs_by_session.
-            "meta": job.meta if job.meta else {},
-            "kwargs": job.kwargs if job.kwargs else {},
+            "meta": _redact_job_mapping(job.meta or {}),
+            "kwargs": _redact_job_mapping(job.kwargs or {}),
@@
-            "timestamp": asyncio.get_event_loop().time()
+            "timestamp": __import__("time").time()

Also drop the unused conversation_ids computation in fetch_session_jobs and rename status_name to _status_name to silence lints.

backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py (1)

63-71: Avoid duplicate user lookups

You fetch the user twice. Reuse the first lookup for the primary_speakers check.

-    user = await get_user_by_id(user_id)
-    if user:
+    user = await get_user_by_id(user_id)
+    if user:
         user_email = user.email
     else:
         logger.warning(f"Could not find user {user_id}")
         user_email = ""
@@
-    user = await get_user_by_id(user_id)
-    if user and user.primary_speakers:
+    if user and user.primary_speakers:

Also applies to: 97-103

backends/advanced/src/advanced_omi_backend/models/conversation.py (1)

319-341: Preserve None for optional transcript instead of empty string

Setting transcript to "" can break downstream checks that rely on None for "not yet set".

-        "transcript": transcript or "",
+        "transcript": transcript,
backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py (3)

12-14: Remove unused import

transcription_queue is imported but not used.

-from advanced_omi_backend.controllers.queue_controller import transcription_queue

164-176: Logging/style nits: f-strings without placeholders and exception logging

  • Remove redundant f prefixes on constant strings.
  • Use logger.exception in the except to capture traceback without manual traceback.format_exc().
-        logger.info(f"🎤 Speaker recognition disabled, skipping")
+        logger.info("🎤 Speaker recognition disabled, skipping")
@@
-        logger.info(f"🎤 Calling speaker recognition service...")
+        logger.info("🎤 Calling speaker recognition service...")
@@
-            logger.warning(f"🎤 Speaker recognition returned no segments")
+            logger.warning("🎤 Speaker recognition returned no segments")
@@
-    except Exception as speaker_error:
-        logger.error(f"❌ Speaker recognition failed: {speaker_error}")
-        import traceback
-        logger.debug(traceback.format_exc())
+    except Exception as speaker_error:
+        logger.exception("❌ Speaker recognition failed")

Also applies to: 208-209, 276-279


109-110: Unused parameter: redis_client

If you don’t intend to use it, rename to _redis_client to silence linters while preserving decorator signature.

-    redis_client=None
+    _redis_client=None
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (4)

105-115: Expose a real timestamp or remove legacy field

Setting "timestamp": 0 is misleading. Either compute from created_at or drop it.

-            "timestamp": 0,  # Legacy field - using created_at instead
+            "timestamp": int(conversation.created_at.timestamp()) if conversation.created_at else None,

Apply similarly in get_conversations.


141-144: List API: same timestamp nit; good exclusion of large fields

Replace timestamp 0 as above to avoid confusion. Everything else looks good.

-                "timestamp": 0,  # Legacy field - using created_at instead
+                "timestamp": int(conv.created_at.timestamp()) if conv.created_at else None,

Also applies to: 146-157


355-368: Job IDs: reduce collision risk on reprocessing

Using job_id like reprocess_{conversation_id[:8]} or speaker_{...} can collide across retries. Append version_id or a timestamp.

-            job_id=f"reprocess_{conversation_id[:8]}",
+            job_id=f"reprocess_{conversation_id[:8]}_{version_id}",
@@
-            job_id=f"speaker_{conversation_id[:8]}",
+            job_id=f"speaker_{conversation_id[:8]}_{version_id}",
@@
-            job_id=f"crop_{conversation_id[:8]}",
+            job_id=f"crop_{conversation_id[:8]}_{version_id}",

Also applies to: 371-387, 389-401


428-431: Use logger.exception in except blocks

Prefer logger.exception to capture stack traces automatically; return HTTP 500 as you already do.

-        logger.error(f"Error starting transcript reprocessing: {e}")
+        logger.exception("Error starting transcript reprocessing")

(Apply similarly to other except blocks in this file.)

Also applies to: 562-565, 605-607, 79-85, 292-298

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py (4)

131-208: Tighten exception handling, drop extraneous f-string, and move inner imports to the top

  • Multiple bare except Exception swallow stack traces; use logger.exception(...) to preserve context, and narrow when feasible.
  • pattern = f"audio:session:*" has no placeholders; drop the f.
  • Importing redis.asyncio and rq.Job inside the function violates the guidelines; move to module top unless there’s a proven cycle.

Suggested in-place edits:

-    from advanced_omi_backend.controllers.queue_controller import redis_conn
-    from rq.job import Job
-    import redis.asyncio as redis
+    # uses top-level imports (see module header changes below)
@@
-        job_id_key = f"speech_detection_job:{client_id}"
+        job_id_key = f"speech_detection_job:{client_id}"
@@
-            except Exception as job_error:
-                logger.warning(f"⚠️ Failed to cancel job {job_id}: {job_error}")
+            except Exception:
+                logger.exception(f"⚠️ Failed to cancel job {job_id}")
@@
-    except Exception as e:
-        logger.warning(f"⚠️ Error during job cancellation for client {client_id}: {e}")
+    except Exception:
+        logger.exception(f"⚠️ Error during job cancellation for client {client_id}")
@@
-        pattern = f"audio:session:*"
+        pattern = "audio:session:*"
@@
-        await async_redis.close()
+        await async_redis.close()
@@
-    except Exception as session_error:
-        logger.warning(f"⚠️ Error marking sessions complete for client {client_id}: {session_error}")
+    except Exception:
+        logger.exception(f"⚠️ Error marking sessions complete for client {client_id}")

Add these imports at the top of the file (outside any function):

# top-level imports to satisfy cleanup_client_state
import redis.asyncio as redis  # async Redis client
from rq.job import Job

326-326: Move start_streaming_jobs import to the top of the module

Importing inside _initialize_streaming_session violates the guidelines. No clear circular dependency here; move it up.

-    from advanced_omi_backend.controllers.queue_controller import start_streaming_jobs
+    # at top of file
+from advanced_omi_backend.controllers.queue_controller import start_streaming_jobs

699-699: Avoid in-function import of write_audio_file

Per guidelines, place imports at the top unless resolving a cycle. This one is safe to hoist.

-        from advanced_omi_backend.utils.audio_utils import write_audio_file
+        # at top of file
+from advanced_omi_backend.utils.audio_utils import write_audio_file

727-741: Remove unused version_id assignment

version_id is created but never used in this block.

-        # Create conversation immediately for batch audio (conversation_id auto-generated)
-        version_id = str(uuid.uuid4())
-
         conversation = create_conversation(
             audio_uuid=audio_uuid,
             user_id=user_id,
             client_id=client_id,
             title="Batch Recording",
             summary="Processing batch audio..."
         )
         await conversation.insert()
         conversation_id = conversation.conversation_id  # Get the auto-generated ID
backends/advanced/src/advanced_omi_backend/controllers/session_controller.py (6)

65-67: Use logger.exception and avoid blind except Exception

Preserve stack traces and narrow exception scope when possible.

-    except Exception as e:
-        logger.error(f"Error getting session info for {session_id}: {e}")
+    except Exception:
+        logger.exception("Error getting session info for %s", session_id)

101-103: Consistent exception logging

Apply the same pattern across helpers to keep stack traces.

-    except Exception as e:
-        logger.error(f"Error getting all sessions: {e}")
+    except Exception:
+        logger.exception("Error getting all sessions")
@@
-    except Exception as e:
-        logger.error(f"Error getting conversation count for session {session_id}: {e}")
+    except Exception:
+        logger.exception("Error getting conversation count for session %s", session_id)
@@
-    except Exception as e:
-        logger.error(f"Error incrementing conversation count for session {session_id}: {e}")
+    except Exception:
+        logger.exception("Error incrementing conversation count for session %s", session_id)

Also applies to: 121-123, 143-145


167-219: Replace KEYS with SCAN to avoid blocking Redis on large keyspaces

await redis_client.keys("audio:session:*") can block Redis. Prefer SCAN paging:

-        session_keys = await redis_client.keys("audio:session:*")
+        session_keys = []
+        cursor = 0
+        while True:
+            cursor, keys = await redis_client.scan(cursor, match="audio:session:*", count=200)
+            session_keys.extend(keys)
+            if cursor == 0:
+                break

430-438: Consider de-duplicating stream_health vs active_streams

You return both active_streams and stream_health (alias). Keep one to reduce payload size; maintain alias only if the frontend depends on it.


448-452: Move in-function imports to the top

import time and from fastapi.responses import JSONResponse within cleanup_old_sessions violate the guidelines; hoist them to the module header.


573-586: Use explicit conversion flags and include stack traces on errors

  • Prefer {e!s} or pass args to logger.
  • Use logger.exception(...).
-    except Exception as e:
-        logger.error(f"Error cleaning up old sessions: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Error cleaning up old sessions")
@@
-        return JSONResponse(
-            status_code=500, content={"error": f"Failed to cleanup old sessions: {str(e)}"}
-        )
+        return JSONResponse(
+            status_code=500, content={"error": "Failed to cleanup old sessions"}
+        )
backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py (6)

24-29: Mark unused injected params to satisfy linters without changing the API

redis_client is decorator-injected but unused. Prefix to underscore.

-async def process_cropping_job(
+async def process_cropping_job(
     conversation_id: str,
     audio_path: str,
-    redis_client=None
+    redis_client=None  # noqa: ARG001 (injected), keep signature stable
 ) -> Dict[str, Any]:

Or rename to _redis_client and update references if any.


47-52: Move inner imports to the top of the module

Per guidelines, hoist these unless a cycle is proven.

-    from pathlib import Path
-    from advanced_omi_backend.utils.audio_utils import _process_audio_cropping_with_relative_timestamps
-    from advanced_omi_backend.database import get_collections, AudioChunksRepository
-    from advanced_omi_backend.models.conversation import Conversation
-    from advanced_omi_backend.config import CHUNK_DIR
+from pathlib import Path
+from advanced_omi_backend.utils.audio_utils import _process_audio_cropping_with_relative_timestamps
+from advanced_omi_backend.database import get_collections, AudioChunksRepository
+from advanced_omi_backend.models.conversation import Conversation
+from advanced_omi_backend.config import CHUNK_DIR

94-101: Preserve stack traces on failure and simplify returns

Use logger.exception and keep returns concise.

-        if not success:
-            logger.error(f"❌ RQ: Audio cropping failed for conversation {conversation_id}")
+        if not success:
+            logger.exception("❌ RQ: Audio cropping failed for conversation %s", conversation_id)
             return {
                 "success": False,
                 "conversation_id": conversation_id,
                 "reason": "cropping_failed"
             }
@@
-    except Exception as e:
-        logger.error(f"❌ RQ: Audio cropping failed for conversation {conversation_id}: {e}")
+    except Exception:
+        logger.exception("❌ RQ: Audio cropping failed for conversation %s", conversation_id)
         raise

Also applies to: 133-135


338-370: Rename unused loop variable and keep logs lightweight

Minor readability nit: _stream_name makes intent clear.

-                for stream_name, msgs in audio_messages:
+                for _stream_name, msgs in audio_messages:

380-383: Avoid blind except Exception for stream reads

Use logger.exception (or handle specific Redis errors) to aid debugging while remaining non-fatal.

-        except Exception as audio_error:
-            # Stream might not exist yet or other transient errors
-            logger.debug(f"Audio stream read error (non-fatal): {audio_error}")
+        except Exception:
+            # Stream might not exist yet or other transient errors
+            logger.exception("Audio stream read error (non-fatal)")

189-199: Hoist imports and clarify unused args in streaming job

  • Move CHUNK_DIR, LocalFileSink, AudioChunk imports to module top.
  • user_id isn’t used; keep for API compatibility but annotate or prefix with _ if you prefer.

No functional change required.

Also applies to: 212-217

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py (4)

107-156: Use logger.exception when fetching job details fails

Keeps stack for triage.

-                except Exception as e:
-                    logger.error(f"Error fetching job {job_id}: {e}")
+                except Exception:
+                    logger.exception("Error fetching job %s", job_id)

175-237: Narrow exception scope in dependency traversal

all_jobs_complete_for_session uses broad except. Prefer NoSuchJobError and add logger.exception for unexpected cases to avoid hiding issues in terminal-state checks.


308-451: user_id parameter is unused; either remove or add to job metadata for traceability

Given multiple jobs are enqueued, including user_id in meta helps filtering and avoids the unused-arg smell.

Minimal change:

-    cropping_job = default_queue.enqueue(
+    cropping_job = default_queue.enqueue(
         process_cropping_job,
         conversation_id,
         audio_file_path,
@@
-        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
+        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'user_id': user_id}
     )
@@
-        transcription_job = transcription_queue.enqueue(
+        transcription_job = transcription_queue.enqueue(
             transcribe_full_audio_job,
             conversation_id,
             audio_uuid,
             audio_file_path,
             version_id,
             "batch",  # trigger
@@
-            meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
+            meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'user_id': user_id}
         )
@@
-    speaker_job = transcription_queue.enqueue(
+    speaker_job = transcription_queue.enqueue(
         recognise_speakers_job,
         conversation_id,
         version_id,
         audio_file_path,
         "",  # transcript_text - will be read from DB
         [],  # words - will be read from DB
@@
-        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
+        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'user_id': user_id}
     )
@@
-    memory_job = memory_queue.enqueue(
+    memory_job = memory_queue.enqueue(
         process_memory_job,
         conversation_id,
@@
-        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
+        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'user_id': user_id}
     )
@@
-    title_summary_job = default_queue.enqueue(
+    title_summary_job = default_queue.enqueue(
         generate_title_summary_job,
         conversation_id,
@@
-        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
+        meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'user_id': user_id}
     )

Alternatively, update the function to user_id: Optional[str] = None.


507-716: Hoist imports, prefer SCAN, and improve exception handling in stream cleanup

  • Move time and JSONResponse imports to the top.
  • Consider SCAN instead of KEYS for audio:stream:*.
  • Replace blind except blocks with logger.exception and narrow known Redis errors.

Example adjustments:

-    import time
-    from fastapi.responses import JSONResponse
+    # use top-level imports

@@
-        stream_keys = await redis_client.keys("audio:stream:*")
+        stream_keys = []
+        cursor = 0
+        while True:
+            cursor, keys = await redis_client.scan(cursor, match="audio:stream:*", count=200)
+            stream_keys.extend(keys)
+            if cursor == 0:
+                break
@@
-            except Exception as e:
-                cleanup_results[stream_name] = {
-                    "error": str(e),
-                    "cleaned": 0
-                }
+            except Exception:
+                logger.exception("Error cleaning stream %s", stream_name)
+                cleanup_results[stream_name] = {"error": "exception", "cleaned": 0}
@@
-    except Exception as e:
-        logger.error(f"Error cleaning up stuck workers: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Error cleaning up stuck workers")
backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py (2)

155-163: Minor polish: exception style and log message consistency

  • The ValueError/FileNotFoundError messages are fine; if you want to align with TRY003 guidance, consider shorter messages and rely on context logging.
  • No functional changes required.

Also applies to: 172-176


334-611: Speech detection flow reads clean; two small nits

  • Replace a few blind except cases (speaker_check_job.refresh() handling) with explicit NoSuchJobError and use logger.exception on unknown failures.
  • If record-only-enrolled gating changes later, you already log/continue—good.

No blocking issues.

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py (3)

85-102: Consider more specific exception types.

The nested try-except blocks catch bare Exception, which is overly broad. While logging metadata update failures is appropriate for job resilience, catching specific exceptions would improve clarity.

Consider catching specific exceptions:

-                    except Exception as speaker_err:
+                    except (LookupError, AttributeError, ConnectionError) as speaker_err:
                         logger.warning(f"⚠️ Failed to update speaker check job metadata: {speaker_err}")
-        except Exception as e:
+        except (LookupError, AttributeError, ConnectionError) as e:
             logger.warning(f"⚠️ Failed to update speech job metadata: {e}")

283-283: Remove unnecessary f-string prefix.

The string has no placeholders, so the f prefix is extraneous.

Apply this diff:

-        logger.warning(f"⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription")
+        logger.warning("⚠️ Audio persistence job may not have rotated file yet - cannot enqueue batch transcription")

404-407: Remove or prefix unused parameter.

The redis_client parameter is injected by the decorator but never used in the function. Either use it or prefix with underscore to indicate intentional non-use.

If the parameter is required by the decorator but not used:

 async def generate_title_summary_job(
     conversation_id: str,
-    redis_client=None
+    _redis_client=None
 ) -> Dict[str, Any]:

Or if not required, remove it entirely.

Comment on lines +97 to 116
def health_check(self) -> Dict:
"""Check OpenAI-compatible service health."""
try:
if not (self.model and self.base_url):
# For OpenAI API, check if we have valid configuration
# Avoid calling /models endpoint as it can be unreliable
if self.api_key and self.api_key != "dummy" and self.model:
return {
"status": "⚠️ Configuration incomplete (missing model or base_url)",
"status": "✅ Connected",
"base_url": self.base_url,
"default_model": self.model,
"api_key_configured": bool(self.api_key and self.api_key != "dummy"),
}

if self.provider == "ollama":
import aiohttp
ollama_health_url = self.base_url.replace("/v1", "") if self.base_url.endswith("/v1") else self.base_url

# Initialize response with main LLM status
response_data = {
"status": "❌ Unknown",
else:
return {
"status": "⚠️ Configuration incomplete",
"base_url": self.base_url,
"default_model": self.model,
"api_key_configured": False,
"embedder_model": os.getenv("OLLAMA_EMBEDDER_MODEL"),
"embedder_status": "❌ Not Checked"
"api_key_configured": bool(self.api_key and self.api_key != "dummy"),
}

try:
async with aiohttp.ClientSession() as session:
# Check main Ollama server health
async with session.get(f"{ollama_health_url}/api/version", timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
response_data["status"] = "✅ Connected"
else:
response_data["status"] = f"⚠️ Ollama Unhealthy: HTTP {response.status}"

# Check embedder model availability
embedder_model_name = os.getenv("OLLAMA_EMBEDDER_MODEL")
if embedder_model_name:
try:
# Use /api/show to check if model exists
async with session.post(f"{ollama_health_url}/api/show", json={"name": embedder_model_name}, timeout=aiohttp.ClientTimeout(total=5)) as embedder_response:
if embedder_response.status == 200:
response_data["embedder_status"] = "✅ Available"
else:
response_data["embedder_status"] = "⚠️ Embedder Model Unhealthy"
except aiohttp.ClientError:
response_data["embedder_status"] = "❌ Embedder Model Connection Failed"
except asyncio.TimeoutError:
response_data["embedder_status"] = "❌ Embedder Model Timeout"
else:
response_data["embedder_status"] = "⚠️ Embedder Model Not Configured"

except aiohttp.ClientError:
response_data["status"] = "❌ Ollama Connection Failed"
except asyncio.TimeoutError:
response_data["status"] = "❌ Ollama Connection Timeout (5s)"

return response_data
else:
# For other OpenAI-compatible APIs, check configuration
if self.api_key and self.api_key != "dummy":
return {
"status": "✅ Connected",
"base_url": self.base_url,
"default_model": self.model,
"api_key_configured": bool(self.api_key and self.api_key != "dummy"),
}
else:
return {
"status": "⚠️ Configuration incomplete (missing API key)",
"base_url": self.base_url,
"default_model": self.model,
"api_key_configured": bool(self.api_key and self.api_key != "dummy"),
}
except Exception as e:
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Health check shouldn’t depend on non‑dummy API key.

Mark “Connected” if base_url and model are configured; report api_key_configured separately.

Apply:

-            # For OpenAI API, check if we have valid configuration
-            # Avoid calling /models endpoint as it can be unreliable
-            if self.api_key and self.api_key != "dummy" and self.model:
+            # Check for sufficient configuration without making network calls
+            if self.base_url and self.model:
                 return {
-                    "status": "✅ Connected",
+                    "status": "✅ Configured",
                     "base_url": self.base_url,
                     "default_model": self.model,
-                    "api_key_configured": bool(self.api_key and self.api_key != "dummy"),
+                    "api_key_configured": bool(self.api_key),
                 }
             else:
                 return {
                     "status": "⚠️ Configuration incomplete",
                     "base_url": self.base_url,
                     "default_model": self.model,
-                    "api_key_configured": bool(self.api_key and self.api_key != "dummy"),
+                    "api_key_configured": bool(self.api_key),
                 }
🧰 Tools
🪛 Ruff (0.14.1)

116-116: Do not catch blind exception: Exception

(BLE001)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot recall why this was changed. I suspect it was claude optimising. Any idea what this code does @AnkushMalaker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +475 to +482
except Exception as gen_error:
logger.error(f"❌ Title/summary generation failed: {gen_error}")
return {
"success": False,
"error": str(gen_error),
"conversation_id": conversation_id,
"processing_time_seconds": time.time() - start_time
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Improve exception handling.

Two issues:

  1. Catching bare Exception is overly broad - consider catching specific LLM-related exceptions
  2. Use logger.exception instead of logger.error to automatically include the traceback

Apply this diff:

     except Exception as gen_error:
-        logger.error(f"❌ Title/summary generation failed: {gen_error}")
+        logger.exception(f"❌ Title/summary generation failed: {gen_error}")

For the broad exception, consider whether specific exceptions from the LLM utility functions can be caught (e.g., OpenAIError, TimeoutError, etc.).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as gen_error:
logger.error(f"❌ Title/summary generation failed: {gen_error}")
return {
"success": False,
"error": str(gen_error),
"conversation_id": conversation_id,
"processing_time_seconds": time.time() - start_time
}
except Exception as gen_error:
logger.exception(f"❌ Title/summary generation failed: {gen_error}")
return {
"success": False,
"error": str(gen_error),
"conversation_id": conversation_id,
"processing_time_seconds": time.time() - start_time
}
🧰 Tools
🪛 Ruff (0.14.1)

475-475: Do not catch blind exception: Exception

(BLE001)


476-476: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py
around lines 475–482, the code currently catches a bare Exception and logs with
logger.error; change this to catch specific LLM-related exceptions first (e.g.,
OpenAIError, TimeoutError, or any custom exceptions your LLM utilities raise)
and handle them accordingly, using logger.exception(...) to include the
traceback; then add a final broad except Exception as gen_error to handle
unexpected errors but still use logger.exception and preserve the existing
return structure (include importing the specific exception classes at the top of
the file if not already imported).

dockerfile: Dockerfile
args:
PYTORCH_CUDA_VERSION: ${PYTORCH_CUDA_VERSION:-cpu}
dockerfile: Dockerfile-M2
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that Dockerfile-M2 exists and is compatible with deployment targets.

The change from a generic Dockerfile to a platform-specific Dockerfile-M2 suggests an M2/Apple Silicon optimization. Ensure:

  1. The file extras/speaker-recognition/Dockerfile-M2 exists and is properly configured.
  2. The M2-specific image is compatible with all deployment and CI/CD environments.
  3. If the original Dockerfile supported multiple platforms (e.g., x86_64), confirm that Dockerfile-M2 maintains sufficient compatibility or that platform-specific logic is handled elsewhere.

🏁 Script executed:

#!/bin/bash
# Description: Verify Dockerfile-M2 exists and inspect its contents

# Check if Dockerfile-M2 exists in the speaker-recognition directory
if [ -f "extras/speaker-recognition/Dockerfile-M2" ]; then
  echo "✓ Dockerfile-M2 found"
  echo "---"
  head -20 "extras/speaker-recognition/Dockerfile-M2"
else
  echo "✗ Dockerfile-M2 NOT found"
fi

# Also check for the original Dockerfile
echo ""
if [ -f "extras/speaker-recognition/Dockerfile" ]; then
  echo "✓ Original Dockerfile still exists"
else
  echo "ℹ Original Dockerfile does not exist (may be intentional)"
fi

Length of output: 61


🏁 Script executed:

#!/bin/bash
# List all files in extras/speaker-recognition to find any Dockerfile variants

echo "=== Files in extras/speaker-recognition/ ==="
ls -la extras/speaker-recognition/

echo ""
echo "=== All Dockerfile-related files ==="
find extras/speaker-recognition -name "Dockerfile*" -type f

echo ""
echo "=== Full docker-compose.yml content ==="
cat extras/speaker-recognition/docker-compose.yml

Length of output: 4851


Dockerfile-M2 does not exist—this change breaks the build.

The docker-compose.yml references Dockerfile-M2 on line 9, but this file does not exist in extras/speaker-recognition/. Existing Dockerfile variants are: Dockerfile, Dockerfile-gtx1070, Dockerfile-k8s, and Dockerfile.blackwell. Running docker-compose build will fail immediately.

Correct the reference to point to an existing Dockerfile (likely Dockerfile for the generic CPU profile), or create Dockerfile-M2 if this platform-specific variant is intended.

🤖 Prompt for AI Agents
In extras/speaker-recognition/docker-compose.yml around line 9, the service
references a non-existent Dockerfile named Dockerfile-M2 which breaks builds;
change the dockerfile key to point to an existing file (e.g., Dockerfile for the
generic CPU image or one of Dockerfile-gtx1070, Dockerfile-k8s,
Dockerfile.blackwell) or add a new Dockerfile-M2 to the directory if a
platform-specific variant is required; update the reference accordingly and
ensure the chosen Dockerfile is committed to extras/speaker-recognition/.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thestumonkey this looks legit, did you forget to commit that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@AnkushMalaker AnkushMalaker changed the title Close convo2 Adds "closing conversation", beefs up and fixes Queue management page Oct 28, 2025
@AnkushMalaker AnkushMalaker changed the title Adds "closing conversation", beefs up and fixes Queue management page feat: Adds "closing conversation", beefs up and fixes Queue management page Oct 28, 2025
dockerfile: Dockerfile
args:
PYTORCH_CUDA_VERSION: ${PYTORCH_CUDA_VERSION:-cpu}
dockerfile: Dockerfile-M2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thestumonkey this looks legit, did you forget to commit that?

# Initialize repositories (legacy collections only)
chunk_repo = AudioChunksRepository(chunks_col)
# ProcessingRunsRepository removed - using RQ job tracking instead
# Legacy audio_chunks collection is still used by some endpoints (speaker assignment, segment updates)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to not leave dead code with this PR. Lets see how we can clean this up.

Legacy audio_chunks collection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time I get involved wrapping my head around the audio chunks, it spirals into another refactor. I'm happy to continue working on this and do this subsequently, can we do as a different PR?
shipping > perfect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

around the audio chunks, it spirals into another refactor

Hehe thats cuz audio chunks was the central component spanning all systems. That is hard, one coherent system across so many. This PR tries to change that component

shipping > perfect?
Yeah, sure - let me see how big the refactor is. If I can quickly add a commit on top then great otherwise down to merge this.

thestumonkey and others added 11 commits October 28, 2025 07:54
…nd improve error handling in Deepgram API calls. Update Caddyfile generation logic to handle directory conflicts and provide clearer error messages. Adjust datetime handling in queue routes for timezone awareness.
…t, QUEUE_NAMES, for improved maintainability. Update relevant functions in queue_controller and queue_routes to utilize this new constant.
…e endpoint from `/api/process-audio-files` to `/api/audio/upload` across multiple files, including Docker Compose, documentation, and integration tests. Add a new worker service in Docker Compose for handling test audio uploads.
@AnkushMalaker
Copy link
Collaborator

LGTM! <3

@AnkushMalaker AnkushMalaker merged commit 14f5242 into SimpleOpenSoftware:main Nov 3, 2025
1 of 3 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Nov 17, 2025
thestumonkey pushed a commit to Ushadow-io/chronicle that referenced this pull request Nov 28, 2025
feat: Adds "closing conversation", beefs up and fixes Queue management page
This was referenced Dec 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants