Skip to content

Better audio processing#111

Merged
AnkushMalaker merged 4 commits intomainfrom
better-audio-processing
Sep 19, 2025
Merged

Better audio processing#111
AnkushMalaker merged 4 commits intomainfrom
better-audio-processing

Conversation

@AnkushMalaker
Copy link
Collaborator

@AnkushMalaker AnkushMalaker commented Sep 19, 2025

Summary by CodeRabbit

  • New Features
    • Memories page now displays the detected memory provider (e.g., Friend-Lite, OpenMemory MCP).
    • Optional diarization enabled when using Deepgram-based transcription.
  • Changes
    • File uploads now require 16 kHz audio; mismatched sample rates return 400 errors.
  • Bug Fixes
    • More reliable client/session handling and unified audio processing for uploads and streams.
    • Simplified cropping and transcription flows to reduce errors.
  • Chores
    • Runtime now includes Deepgram extras by default.
    • Reduced CI/Docker logs; removed speaker-recognition compute-mode flag.

- Removed compute mode handling from service setup in `init.py`.
- Introduced `audio_utils.py` for audio processing functions, including `process_audio_chunk` and audio cropping utilities.
- Updated `ClientManager` to be self-initializing and added atomic client creation and removal methods.
- Refactored audio chunk processing in various controllers to utilize the new unified audio processing pipeline.
- Enhanced transcription status update functionality in `database.py` to manage active transcript versions.
- Cleaned up imports and improved logging across multiple files for better clarity and maintainability.
…rovider state in Memories component

- Modified the CMD in the Dockerfile to include an extra argument for deepgram.
- Simplified subprocess calls in integration tests by removing stdout and stderr captures.
- Added memory provider state management in the Memories component, displaying the provider name conditionally in the UI.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 19, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Centralizes client lifecycle under ClientManager, unifies audio ingestion via process_audio_chunk, updates cropping to accept repository and speech segments, adjusts transcription API (no duration arg) with diarization gating, adds transcription status updates in DB, tightens sample-rate validation, modifies startup/shutdown flows, and minor Docker, memory UI, tests, and service tweaks.

Changes

Cohort / File(s) Summary
Runtime/Docker
backends/advanced/Dockerfile
CMD updated to use uv run with --extra deepgram.
Audio utilities & cropping
backends/advanced/src/advanced_omi_backend/audio_utils.py
Added env-driven constants; new async process_audio_chunk; timestamp fallback; refactored cropping helper to accept speech_segments, output_path, and optional chunk_repo; conditional repo update; TYPE_CHECKING hints.
Client state and manager
backends/advanced/src/advanced_omi_backend/client.py, .../client_manager.py
Minor log text in client; major ClientManager refactor: internal store, atomic create/remove, cleanup variant, getters (IDs/count), add_existing_client; legacy init path logs warning.
Controllers wiring
.../controllers/conversation_controller.py, .../controllers/system_controller.py
Conversation: cropping now imports from audio_utils, supplies speech_segments/output_path/chunk_repo; 400 if no segments. System: replaced manual queueing with process_audio_chunk for files; stricter 16000 Hz check returns 400.
Main app lifecycle
.../main.py
Switched to ClientManager for state; WebSocket paths call process_audio_chunk; new getters for managers; cleanup via remove_client_with_cleanup; startup/shutdown logs streamlined; health uses client_manager counts.
Processors & transcription
.../processors.py, .../transcription.py
Cropping calls updated helper with repository; memory paths key by conversation_id; removed duration arg from transcription flush; transcription methods now no-arg, unified transcript retrieval, diarization gating based on provider/source; logging simplified.
Database repository
.../database.py
Added update_transcription_status to manage versioned transcript status; mirrors top-level fields; handles errors/providers; appears duplicated once.
Memory service factory/init
.../memory/__init__.py, .../memory/service_factory.py
Removed a success log; lazy memory service init (no background task); service info reads _initialized directly; added debug log.
Tests
backends/advanced/tests/test_integration.py
Stopped capturing/logging stdout/stderr for docker compose build/up; behavior otherwise unchanged.
Web UI
backends/advanced/webui/src/pages/Memories.tsx
Display memory provider in header; reads provider from system config; friendly names; logs and fallback to 'unknown'.
Service launcher
init.py
Removed compute-mode flag passthrough for speaker-recognition; other env handling unchanged.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client as Client (WS/File)
  participant Main as FastAPI App
  participant CM as ClientManager
  participant AU as audio_utils.process_audio_chunk
  participant PM as ProcessorManager
  participant DB as AudioChunksRepository

  Note over Client,Main: Ingestion (WS OMI/PCM or File Upload)
  Client->>Main: Send audio bytes + metadata
  Main->>CM: ensure/create client_state
  Main->>AU: process_audio_chunk(audio_data, client_id, user/user_email, audio_format)
  AU->>PM: queue AudioProcessingItem
  PM->>PM: process (transcribe, crop, memory, etc.)
  PM->>DB: update statuses (e.g., transcription status)
  PM-->>Main: processing updates/events
Loading
sequenceDiagram
  autonumber
  participant Main as FastAPI Shutdown
  participant CM as ClientManager
  participant PM as ProcessorManager
  participant TM as TaskManager
  participant Mem as MemoryService

  Note over Main,CM: Graceful shutdown
  Main->>CM: get_all_client_ids()
  loop for each client_id
    Main->>CM: remove_client_with_cleanup(client_id)
  end
  Main->>PM: shutdown processors
  Main->>TM: shutdown tasks
  Main->>Mem: shutdown memory/speaker services
  Main-->>Main: App terminated
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • Reprocessing WIP #96 — Introduces reprocessing/versioned conversations touching controllers, DB repository APIs, and endpoints similar to this PR’s database and controller changes.
  • Openmemory mcp #69 — Refactors/introduces memory subsystem (OpenMemory MCP), aligning with this PR’s memory service factory and UI provider wiring.

Poem

I thump my paw: new streams now flow,
One pipe for bytes, where clients go.
Crops are trimmed with segments neat,
Transcripts march in versioned beat.
Managers herd, services rest—
Deepgram packed, the queue’s impressed.
Hop, commit; ship the best! 🐇✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Better audio processing" is concise and directly related to the primary changes in the branch, which center on unifying and improving audio ingestion/processing (e.g., process_audio_chunk), cropping and transcription flows, and related client/processor refactors; it clearly signals the main intention without being misleading.
Docstring Coverage ✅ Passed Docstring coverage is 93.75% which is sufficient. The required threshold is 80.00%.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 19, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (12)
backends/advanced/Dockerfile (1)

43-43: Avoid --extra at runtime; rely on baked deps

You already uv sync --extra deepgram during build. Keeping --extra deepgram in CMD can trigger resolution/network at startup and bypass lock semantics. Prefer running the app with the already-synced env.

-CMD ["uv", "run", "--extra", "deepgram", "python3", "src/advanced_omi_backend/main.py"]
+CMD ["uv", "run", "python3", "src/advanced_omi_backend/main.py"]

If there’s a concrete need for runtime extras, note it in a comment and consider UV_NO_SYNC=1 to prevent implicit syncs.

backends/advanced/src/advanced_omi_backend/memory/service_factory.py (1)

132-134: Don’t assume _initialized exists; use getattr for safety

Direct access can raise AttributeError if an implementation deviates from the base. Use a safe read.

-        # All memory services should have _initialized attribute per the base class
-        info["service_initialized"] = _memory_service._initialized
+        # All memory services should expose `_initialized`; fall back safely if not
+        info["service_initialized"] = getattr(_memory_service, "_initialized", False)
backends/advanced/webui/src/pages/Memories.tsx (2)

37-38: Don’t display “unknown” provider; use null/undefined to hide

Defaulting to 'unknown' renders an unhelpful provider label. Keep it empty and render only when a real provider is known.

-  const [memoryProvider, setMemoryProvider] = useState<string>('')
+  const [memoryProvider, setMemoryProvider] = useState<string | null>(null)
@@
-      const provider = response.data.memory_provider || 'unknown'
+      const provider = response.data.memory_provider ?? null
@@
-      setMemoryProvider(provider)
+      setMemoryProvider(provider)
@@
-      setMemoryProvider('unknown')
+      setMemoryProvider(null)

Also applies to: 45-49, 53-53


250-263: Hide provider line when not resolved; map known values

Render the provider line only for known values to avoid “Provider: unknown”.

-            {memoryProvider && (
+            {memoryProvider && memoryProvider !== 'unknown' && (
               <p className="text-xs text-gray-500 dark:text-gray-400 mt-0.5">
                 Provider: {memoryProvider === 'friend_lite' ? 'Friend-Lite' : memoryProvider === 'openmemory_mcp' ? 'OpenMemory MCP' : memoryProvider}
               </p>
             )}
backends/advanced/tests/test_integration.py (2)

505-507: Add a timeout to the CI build step to prevent hangs

docker compose build can hang in CI. Other steps have timeouts; this one should too.

-                    build_result = subprocess.run(["docker", "compose", "-f", "docker-compose-test.yml", "build"], env=env)
+                    build_result = subprocess.run(
+                        ["docker", "compose", "-f", "docker-compose-test.yml", "build"],
+                        env=env,
+                        timeout=900  # allow up to 15 minutes for cold CI builds
+                    )

Static analysis S607 about partial paths is acceptable here (test harness), but if needed we can guard with shutil.which("docker").


522-523: OK to stream output; consider check=True for clearer failure

You’ve removed capture to stream logs live (good). You can also let subprocess.run(..., check=True) raise with context.

-            result = subprocess.run(cmd, env=env, timeout=300)
+            result = subprocess.run(cmd, env=env, timeout=300, check=False)

If you switch to check=True, remove the manual returncode checks below. Verify CI log visibility remains adequate with pytest’s -s.

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2)

1016-1016: Clean up unnecessary blank line.

There's an unnecessary blank line after the import statement.

        import yaml
        from advanced_omi_backend.memory_config_loader import get_config_loader
-

1066-1066: Clean up unnecessary blank line.

There's an unnecessary blank line after the import statement.

        import yaml
        from advanced_omi_backend.memory_config_loader import MemoryConfigLoader
-
backends/advanced/src/advanced_omi_backend/client_manager.py (3)

38-47: Consider removing the legacy initialize method.

Since the ClientManager is now self-initializing and the method only logs a warning when called with an external dictionary, consider marking this method as deprecated or removing it entirely if it's not needed for backward compatibility.

@deprecated("ClientManager is now self-initializing and doesn't need external initialization")
def initialize(self, active_clients_dict: Optional[Dict[str, "ClientState"]] = None):
    """Legacy initialization method kept for backward compatibility."""
    if active_clients_dict is not None:
        logger.warning("ClientManager no longer uses external dictionaries - ignoring active_clients_dict")
    logger.info("ClientManager initialization (legacy compatibility mode)")

160-160: Remove unused variable assignment.

The client_state variable is assigned but never used in the remove_client method.

-        # Get client state for cleanup
-        client_state = self._active_clients[client_id]
-
         # Atomically remove from storage and deregister mapping
         del self._active_clients[client_id]
         unregister_client_user_mapping(client_id)

128-128: Consider using a custom exception class.

Instead of raising a generic ValueError, consider creating a custom exception class for better error handling and debugging.

class ClientAlreadyExistsError(Exception):
    """Raised when attempting to create a client with an ID that already exists."""
    pass

# Then in the method:
if client_id in self._active_clients:
    raise ClientAlreadyExistsError(f"Client {client_id} already exists")
backends/advanced/src/advanced_omi_backend/transcription.py (1)

234-245: Diarization configuration loaded on every transcript request.

Loading the diarization settings from a file on every transcript request could impact performance. Consider caching the configuration or loading it once during initialization.

 # Check if we should request diarization based on configuration
-config = load_diarization_settings_from_file()
-diarization_source = config.get("diarization_source", "pyannote")
+# Consider caching this at the class level if the config doesn't change frequently
+if not hasattr(self, '_diarization_config'):
+    self._diarization_config = load_diarization_settings_from_file()
+diarization_source = self._diarization_config.get("diarization_source", "pyannote")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd06f1f and 334d489.

📒 Files selected for processing (15)
  • backends/advanced/Dockerfile (1 hunks)
  • backends/advanced/src/advanced_omi_backend/audio_utils.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/client.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/client_manager.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (8 hunks)
  • backends/advanced/src/advanced_omi_backend/database.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/main.py (11 hunks)
  • backends/advanced/src/advanced_omi_backend/memory/__init__.py (0 hunks)
  • backends/advanced/src/advanced_omi_backend/memory/service_factory.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/processors.py (5 hunks)
  • backends/advanced/src/advanced_omi_backend/transcription.py (5 hunks)
  • backends/advanced/tests/test_integration.py (2 hunks)
  • backends/advanced/webui/src/pages/Memories.tsx (2 hunks)
  • init.py (0 hunks)
💤 Files with no reviewable changes (2)
  • backends/advanced/src/advanced_omi_backend/memory/init.py
  • init.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-18T01:42:08.551Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management

Applied to files:

  • backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
  • backends/advanced/src/advanced_omi_backend/processors.py
🪛 Ruff (0.12.2)
backends/advanced/tests/test_integration.py

504-504: Starting a process with a partial executable path

(S607)


522-522: subprocess call: check for execution of untrusted input

(S603)

backends/advanced/src/advanced_omi_backend/client_manager.py

128-128: Avoid specifying long messages outside the exception class

(TRY003)


160-160: Local variable client_state is assigned to but never used

Remove assignment to unused variable client_state

(F841)

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py

240-240: Abstract raise to an inner function

(TRY301)

backends/advanced/src/advanced_omi_backend/database.py

390-390: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


390-390: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


432-432: f-string without any placeholders

Remove extraneous f prefix

(F541)


433-433: f-string without any placeholders

Remove extraneous f prefix

(F541)


440-440: f-string without any placeholders

Remove extraneous f prefix

(F541)


443-443: f-string without any placeholders

Remove extraneous f prefix

(F541)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: integration-tests
🔇 Additional comments (42)
backends/advanced/src/advanced_omi_backend/client.py (2)

18-18: LGTM!

The reorganization of imports looks good, placing all wyoming imports together for better organization.


68-68: LGTM!

The log message change from "simplified client state" to "client state" is appropriate and makes the messaging consistent with the broader architectural changes.

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (7)

18-21: LGTM!

The multi-line formatting of imports improves readability and makes it easier to track changes in version control.


25-25: LGTM! Unified audio processing is a good architectural improvement.

Importing process_audio_chunk from audio_utils centralizes the audio processing logic and eliminates code duplication.


236-241: Verify that the 16kHz sample rate requirement is intended.

The error handling is appropriate, but the hard requirement for 16kHz sample rate may limit usability. The error message mentions "I'll implement this at some point sorry" which suggests this is a known limitation.

Consider documenting this limitation in the API documentation and tracking the resampling feature implementation:

  1. Add a note in the API documentation that only 16kHz WAV files are currently supported
  2. Consider creating an issue to track the resampling feature implementation
  3. The error message could be more professional: "Audio resampling is not yet supported. Please provide audio files with a 16kHz sample rate."

257-270: LGTM! Unified audio processing architecture.

Excellent refactoring to use the centralized process_audio_chunk function. The approach properly packages all audio metadata into the audio_format dict and correctly sets client_state=None for file uploads where no WebSocket connection exists.


624-637: LGTM! Consistent with the unified audio processing pattern.

The implementation correctly uses the centralized process_audio_chunk function with the same pattern as in process_audio_files.


897-900: LGTM!

The multi-line formatting of the import statement improves readability.


935-938: LGTM!

The multi-line formatting of the import statement maintains consistency with the previous speaker recognition import.

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (3)

12-14: LGTM! Import path updated correctly.

The import has been correctly updated to reflect the relocation of _process_audio_cropping_with_relative_timestamps from audio_cropping_utils to audio_utils.


273-280: LGTM! Improved error handling for cropping prerequisites.

Good addition of validation to ensure speech segments exist before attempting to crop audio. The error response is clear and returns an appropriate 400 status code.


282-296: LGTM! Well-structured cropping implementation with database updates.

The implementation correctly:

  1. Generates the output path for the cropped audio file
  2. Creates a repository instance for database updates
  3. Calls the relocated cropping function with all necessary parameters including the repository

The function signature change to include the repository parameter enables atomic database updates during the cropping process.

backends/advanced/src/advanced_omi_backend/client_manager.py (4)

29-36: LGTM! Self-initializing design simplifies the architecture.

The ClientManager now acts as the single source of truth for client state without requiring external dictionary references. This design eliminates potential synchronization issues between multiple data structures.


107-142: LGTM! Atomic client creation improves concurrency safety.

The create_client method provides atomic client creation and registration, which is essential for preventing race conditions in a concurrent environment. The method properly validates that the client doesn't already exist and atomically registers both the client state and user mapping.

One potential enhancement to consider: Should we validate that the user exists before creating the client? This would prevent orphaned clients if an invalid user_id is provided.


169-194: LGTM! Comprehensive cleanup with disconnect.

The remove_client_with_cleanup method properly handles the full lifecycle by calling the client's disconnect method before removal. This ensures proper resource cleanup.


196-217: LGTM! Useful utility methods for client management.

The addition of get_all_client_ids and add_existing_client methods provides necessary functionality for client enumeration and migration scenarios.

backends/advanced/src/advanced_omi_backend/audio_utils.py (4)

8-24: LGTM! Well-structured imports and configuration.

The use of TYPE_CHECKING for avoiding circular imports is a best practice. The environment-configurable constants for speech processing provide good flexibility.


26-89: Excellent centralization of audio processing logic!

The process_audio_chunk function successfully unifies the audio ingestion pattern across all input sources (WebSocket, file upload). This eliminates code duplication and ensures consistent processing. The function properly handles:

  • Audio format extraction with sensible defaults
  • Timestamp generation when not provided
  • Unified queuing through the processor manager
  • Optional client state updates for WebSocket connections

This is a significant architectural improvement that makes the codebase more maintainable.


91-97: LGTM! Repository parameter enables atomic updates.

The addition of the optional chunk_repo parameter allows the cropping function to update the database atomically when needed, while remaining backward compatible when database updates aren't required.


162-163: LGTM! Conditional database update pattern.

The conditional update only when chunk_repo is not None provides flexibility for different calling contexts - some may need database updates while others may not.

backends/advanced/src/advanced_omi_backend/transcription.py (6)

10-14: LGTM! Clean import organization.

The consolidation of multi-line imports for config functions and explicit AudioChunk import improves code clarity.


17-21: LGTM! Well-structured imports for processors.

The processor imports are properly organized with clear separation of types and the manager function.


287-292: LGTM! Cleaner logging without DEBUG tags.

The simplified logging format improves readability while maintaining essential debugging information.


512-512: Missing parameter in process_collected_audio call.

The call to manager.process_collected_audio() on line 512 is correct (no duration parameter), confirming the signature change is properly implemented in the processor module.


177-178: Verified — callers updated to the new no-arg signature. All discovered usages call process_collected_audio() without arguments; no action required.


219-220: _get_transcript: internal callers verified — signature change is safe.
Only internal call found at backends/advanced/src/advanced_omi_backend/transcription.py:201 (await self._get_transcript()) and the definition at line 219; no other matches detected.

backends/advanced/src/advanced_omi_backend/processors.py (6)

1169-1176: Repository parameter added to cropping function call.

The addition of self.repository as the 5th parameter to _process_audio_cropping_with_relative_timestamps suggests the function now needs database access during cropping operations. This is a good architectural change that allows the cropping function to update database records directly.


889-892: Excellent! Using conversation_id as the primary identifier.

The memory processing flow now correctly uses conversation_id instead of audio_uuid throughout, which aligns with the learning about using conversation_id for all reprocessing operations and version management. This is the correct architectural approach.

Also applies to: 901-903, 907-908


512-512: LGTM! Process collected audio call updated.

The call to manager.process_collected_audio() correctly uses the new signature without the duration parameter.


139-140: Good addition: Duplicate close prevention.

Adding closing_clients set to track clients currently being closed prevents race conditions and duplicate close operations. This is a solid defensive programming practice.


484-562: Well-designed close operation with proper cleanup.

The close_client_audio method properly handles:

  1. Duplicate close prevention
  2. ASR flushing before closing
  3. Proper error handling and logging
  4. Cleanup of resources

This is a robust implementation.


19-21: Resolved — function exists in audio_utils.py with expected signature. Found in backends/advanced/src/advanced_omi_backend/audio_utils.py (lines 91–94): async def _process_audio_cropping_with_relative_timestamps(original_path: str, speech_segments: list[tuple[float, float]], output_path: str, ...).

backends/advanced/src/advanced_omi_backend/main.py (10)

45-46: Good architectural improvements with centralized management.

The imports show a shift to centralized management with the new ClientManager and TaskManager, plus the unified process_audio_chunk function. This improves code organization and maintainability.

Also applies to: 52-53


227-229: LGTM! ClientManager initialization.

The ClientManager is properly initialized as a singleton, providing centralized client state management.


254-257: Excellent use of ClientManager for atomic operations.

Using client_manager.create_client() provides atomic client creation and registration, which is much cleaner than managing state in multiple places.


272-282: Robust cleanup with proper error handling.

The cleanup function properly uses remove_client_with_cleanup() and handles the return value to determine if additional cleanup is needed. Good defensive programming.


589-601: LGTM! Unified audio processing pipeline.

The replacement of direct AudioChunk construction and queuing with process_audio_chunk() unifies the audio ingestion path. This is a significant architectural improvement that reduces code duplication.


823-830: Consistent use of unified pipeline for PCM audio.

The PCM WebSocket handler also uses process_audio_chunk(), maintaining consistency across different audio formats. Good architectural decision.


853-865: Good handling of raw audio with sensible defaults.

The code properly handles raw audio chunks by assuming standard PCM format (16kHz, 16-bit, mono) when no format information is available. This provides backward compatibility.


973-973: Client count now uses ClientManager.

The health endpoint properly uses client_manager.get_client_count() instead of the old len(active_clients) approach. This ensures accurate client counting.


331-331: Proper shutdown sequence with ClientManager.

The shutdown flow correctly iterates over clients using client_manager.get_all_client_ids() and properly shuts down the task manager. The sequence ensures clean resource cleanup.

Also applies to: 340-342


736-749: Proactive transcription manager creation on audio-start.

Creating the transcription manager early when audio-start is received (rather than waiting for the first chunk) is a good optimization that reduces latency for the first transcription.

Comment on lines +389 to +455
async def update_transcription_status(
self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
):
"""Update transcription processing status and completion timestamp.

Interface compatibility method - updates active transcript version.
"""
chunk = await self.get_chunk(audio_uuid)
if not chunk:
return False

active_version = chunk.get("active_transcript_version")
if not active_version:
# Create initial transcript version if none exists
version_id = str(uuid.uuid4())
version_data = {
"version_id": version_id,
"transcript": "",
"segments": [],
"status": status,
"provider": provider,
"created_at": datetime.now(UTC).isoformat(),
"processing_run_id": None,
"raw_data": {},
"speakers_identified": []
}
if error_message:
version_data["error_message"] = error_message

result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{
"$push": {"transcript_versions": version_data},
"$set": {
"active_transcript_version": version_id,
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
}
)
else:
# Update existing active version
update_doc = {
f"transcript_versions.$[version].status": status,
f"transcript_versions.$[version].updated_at": datetime.now(UTC),
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
if status == "COMPLETED":
update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
if error_message:
update_doc[f"transcript_versions.$[version].error_message"] = error_message
update_doc["transcription_error"] = error_message
if provider:
update_doc[f"transcript_versions.$[version].provider"] = provider
update_doc["transcript_provider"] = provider

result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{"$set": update_doc},
array_filters=[{"version.version_id": active_version}]
)

if result.modified_count > 0:
logger.info(f"Updated transcription status to {status} for {audio_uuid}")
return result.modified_count > 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Fix Optional typing and extraneous f-strings; set completion timestamp on first-version path

  • Use Optional[str] for error_message and provider (RUF013).
  • Remove f prefixes where there are no placeholders (F541).
  • When creating the initial transcript version, also set transcription_completed_at if status is COMPLETED (parity with update path).
-    async def update_transcription_status(
-        self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
-    ):
+    async def update_transcription_status(
+        self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None
+    ):
@@
-            result = await self.col.update_one(
+            update_set = {
+                "active_transcript_version": version_id,
+                "transcription_status": status,
+                "transcription_updated_at": datetime.now(UTC).isoformat(),
+            }
+            if status == "COMPLETED":
+                update_set["transcription_completed_at"] = datetime.now(UTC).isoformat()
+
+            result = await self.col.update_one(
                 {"audio_uuid": audio_uuid},
                 {
                     "$push": {"transcript_versions": version_data},
-                    "$set": {
-                        "active_transcript_version": version_id,
-                        "transcription_status": status,
-                        "transcription_updated_at": datetime.now(UTC).isoformat(),
-                    }
+                    "$set": update_set
                 }
             )
@@
-            update_doc = {
-                f"transcript_versions.$[version].status": status,
-                f"transcript_versions.$[version].updated_at": datetime.now(UTC),
+            update_doc = {
+                "transcript_versions.$[version].status": status,
+                "transcript_versions.$[version].updated_at": datetime.now(UTC),
                 "transcription_status": status,
                 "transcription_updated_at": datetime.now(UTC).isoformat(),
             }
@@
-                update_doc[f"transcript_versions.$[version].error_message"] = error_message
+                update_doc["transcript_versions.$[version].error_message"] = error_message
                 update_doc["transcription_error"] = error_message
             if provider:
-                update_doc[f"transcript_versions.$[version].provider"] = provider
+                update_doc["transcript_versions.$[version].provider"] = provider
                 update_doc["transcript_provider"] = provider

Optionally, standardize timestamp types (all isoformat vs datetime objects) for consistency across fields.

To ensure we didn’t accidentally introduce duplicate definitions of this method in the file:


🏁 Script executed:

#!/bin/bash
# Verify there is only one definition of update_transcription_status in the repo
rg -nP '^\s*async\s+def\s+update_transcription_status\b' -C2

Length of output: 478


Fix Optional typing, remove extraneous f-strings, and set completion timestamp on initial-version path

Use Optional[str] for optional params, remove unnecessary f-prefixes on literal dict keys, and when creating the initial transcript version set transcription_completed_at if status == "COMPLETED". Add from typing import Optional to imports if missing.

-    async def update_transcription_status(
-        self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
-    ):
+    async def update_transcription_status(
+        self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None
+    ):
@@
-            result = await self.col.update_one(
+            update_set = {
+                "active_transcript_version": version_id,
+                "transcription_status": status,
+                "transcription_updated_at": datetime.now(UTC).isoformat(),
+            }
+            if status == "COMPLETED":
+                update_set["transcription_completed_at"] = datetime.now(UTC).isoformat()
+
+            result = await self.col.update_one(
                 {"audio_uuid": audio_uuid},
                 {
                     "$push": {"transcript_versions": version_data},
-                    "$set": {
-                        "active_transcript_version": version_id,
-                        "transcription_status": status,
-                        "transcription_updated_at": datetime.now(UTC).isoformat(),
-                    }
+                    "$set": update_set
                 }
             )
@@
-            update_doc = {
-                f"transcript_versions.$[version].status": status,
-                f"transcript_versions.$[version].updated_at": datetime.now(UTC),
+            update_doc = {
+                "transcript_versions.$[version].status": status,
+                "transcript_versions.$[version].updated_at": datetime.now(UTC),
                 "transcription_status": status,
                 "transcription_updated_at": datetime.now(UTC).isoformat(),
             }
@@
-                update_doc[f"transcript_versions.$[version].error_message"] = error_message
+                update_doc["transcript_versions.$[version].error_message"] = error_message
                 update_doc["transcription_error"] = error_message
             if provider:
-                update_doc[f"transcript_versions.$[version].provider"] = provider
+                update_doc["transcript_versions.$[version].provider"] = provider
                 update_doc["transcript_provider"] = provider
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def update_transcription_status(
self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
):
"""Update transcription processing status and completion timestamp.
Interface compatibility method - updates active transcript version.
"""
chunk = await self.get_chunk(audio_uuid)
if not chunk:
return False
active_version = chunk.get("active_transcript_version")
if not active_version:
# Create initial transcript version if none exists
version_id = str(uuid.uuid4())
version_data = {
"version_id": version_id,
"transcript": "",
"segments": [],
"status": status,
"provider": provider,
"created_at": datetime.now(UTC).isoformat(),
"processing_run_id": None,
"raw_data": {},
"speakers_identified": []
}
if error_message:
version_data["error_message"] = error_message
result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{
"$push": {"transcript_versions": version_data},
"$set": {
"active_transcript_version": version_id,
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
}
)
else:
# Update existing active version
update_doc = {
f"transcript_versions.$[version].status": status,
f"transcript_versions.$[version].updated_at": datetime.now(UTC),
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
if status == "COMPLETED":
update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
if error_message:
update_doc[f"transcript_versions.$[version].error_message"] = error_message
update_doc["transcription_error"] = error_message
if provider:
update_doc[f"transcript_versions.$[version].provider"] = provider
update_doc["transcript_provider"] = provider
result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{"$set": update_doc},
array_filters=[{"version.version_id": active_version}]
)
if result.modified_count > 0:
logger.info(f"Updated transcription status to {status} for {audio_uuid}")
return result.modified_count > 0
async def update_transcription_status(
self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None
):
"""Update transcription processing status and completion timestamp.
Interface compatibility method - updates active transcript version.
"""
chunk = await self.get_chunk(audio_uuid)
if not chunk:
return False
active_version = chunk.get("active_transcript_version")
if not active_version:
# Create initial transcript version if none exists
version_id = str(uuid.uuid4())
version_data = {
"version_id": version_id,
"transcript": "",
"segments": [],
"status": status,
"provider": provider,
"created_at": datetime.now(UTC).isoformat(),
"processing_run_id": None,
"raw_data": {},
"speakers_identified": []
}
if error_message:
version_data["error_message"] = error_message
update_set = {
"active_transcript_version": version_id,
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
if status == "COMPLETED":
update_set["transcription_completed_at"] = datetime.now(UTC).isoformat()
result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{
"$push": {"transcript_versions": version_data},
"$set": update_set
}
)
else:
# Update existing active version
update_doc = {
"transcript_versions.$[version].status": status,
"transcript_versions.$[version].updated_at": datetime.now(UTC),
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
if status == "COMPLETED":
update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
if error_message:
update_doc["transcript_versions.$[version].error_message"] = error_message
update_doc["transcription_error"] = error_message
if provider:
update_doc["transcript_versions.$[version].provider"] = provider
update_doc["transcript_provider"] = provider
result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{"$set": update_doc},
array_filters=[{"version.version_id": active_version}]
)
if result.modified_count > 0:
logger.info(f"Updated transcription status to {status} for {audio_uuid}")
return result.modified_count > 0
🧰 Tools
🪛 Ruff (0.12.2)

390-390: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


390-390: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)


432-432: f-string without any placeholders

Remove extraneous f prefix

(F541)


433-433: f-string without any placeholders

Remove extraneous f prefix

(F541)


440-440: f-string without any placeholders

Remove extraneous f prefix

(F541)


443-443: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/database.py around lines 389-455:
the method signature should use Optional[str] for error_message and provider and
you must add "from typing import Optional" to imports; in the initial-version
branch remove extraneous f-strings from literal dict keys (use plain string keys
like "transcription_completed_at") and when status == "COMPLETED" set
transcription_completed_at on the document (using datetime.now(UTC).isoformat()
to match other timestamps); keep the rest of the update logic unchanged and
ensure the updated keys/types mirror the existing update branch.

Comment on lines 79 to 82
# Don't initialize here - let it happen lazily on first use
# This prevents orphaned tasks that cause "Task was destroyed but it is pending" errors
memory_logger.debug(f"Memory service created but not initialized: {type(_memory_service).__name__}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Lazy init is good, but guard singleton creation and fix docstring

  • Possible race: two callers could concurrently create separate instances before assignment. Add a module-level lock with double-checked creation.
  • Docstring still claims “Initialized” instance; adjust to reflect lazy semantics.
-    Returns:
-        Initialized memory service instance
+    Returns:
+        Memory service singleton (may be uninitialized; implementations should lazy‑init on first use)
@@
-    if _memory_service is None:
+    if _memory_service is None:
         try:
             # Build configuration from environment
             config = build_memory_config_from_env()
@@
-            _memory_service = create_memory_service(config)
+            # Double-checked locking to avoid duplicate creation under concurrency
+            import threading  # local import if you prefer to avoid top-level dependency
+            _lock = globals().setdefault("_memory_service_lock", threading.Lock())
+            with _lock:
+                if _memory_service is None:
+                    _memory_service = create_memory_service(config)

Outside this hunk, add (once at module top):

import threading
_memory_service_lock = threading.Lock()

Would you like me to generate a quick search script to check all call sites expect lazy init?


🏁 Script executed:

#!/bin/bash
# First, let's examine the current implementation of the memory service factory
cat backends/advanced/src/advanced_omi_backend/memory/service_factory.py

Length of output: 5020


🏁 Script executed:

#!/bin/bash
# Check if there are any existing threading imports or locks in the file
rg -n "threading|Lock|lock" backends/advanced/src/advanced_omi_backend/memory/service_factory.py

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for all call sites of get_memory_service to understand usage patterns
rg -n -C3 "get_memory_service" --type=py

Length of output: 19605


🏁 Script executed:

#!/bin/bash
# Check if this is actually used in a multi-threaded context
rg -n -C3 "threading|Thread|concurrent|asyncio" backends/advanced/src/advanced_omi_backend/ --type=py

Length of output: 79512


🏁 Script executed:

#!/bin/bash
# Look for existing singleton patterns or locks in the codebase
rg -n -C2 "_lock|threading\.Lock|singleton" backends/advanced/src/advanced_omi_backend/ --type=py

Length of output: 9206


Guard singleton creation and fix docstring

  • File: backends/advanced/src/advanced_omi_backend/memory/service_factory.py — add a module-level threading.Lock and apply double-checked locking in get_memory_service; update the Returns docstring to reflect lazy-init.
-    Returns:
-        Initialized memory service instance
+    Returns:
+        Memory service singleton (may be uninitialized; implementations should lazy‑init on first use)
@@
-    if _memory_service is None:
+    if _memory_service is None:
         try:
             # Build configuration from environment
             config = build_memory_config_from_env()
@@
-            _memory_service = create_memory_service(config)
+            # Double-checked locking to avoid duplicate creation under concurrency
+            import threading  # local import if you prefer to avoid top-level dependency
+            _lock = globals().setdefault("_memory_service_lock", threading.Lock())
+            with _lock:
+                if _memory_service is None:
+                    _memory_service = create_memory_service(config)

Outside this hunk, add (once at module top):

import threading
_memory_service_lock = threading.Lock()
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/memory/service_factory.py around
lines 79 to 82, the singleton memory service creation is not guarded for
concurrent access and the Returns docstring is inaccurate; add a module-level
threading.Lock (import threading and create _memory_service_lock at top of
module) and apply double-checked locking inside get_memory_service so the
function checks if _memory_service is set, acquires the lock only if needed,
re-checks and initializes lazily under the lock, then releases it; also update
the Returns section of the function docstring to state that the MemoryService is
created lazily on first use and may return an already-initialized singleton.

…ry service

- Updated the `update_transcription_status` method in `database.py` to use `Optional` for error message and provider parameters, and refactored the update logic for better clarity.
- Implemented double-checked locking in `get_memory_service` to ensure thread-safe singleton creation in `service_factory.py`, preventing potential race conditions.
@AnkushMalaker AnkushMalaker merged commit 641b7a6 into main Sep 19, 2025
2 checks passed
@AnkushMalaker AnkushMalaker deleted the better-audio-processing branch September 19, 2025 15:24
This was referenced Sep 19, 2025
@coderabbitai coderabbitai bot mentioned this pull request Oct 1, 2025
@coderabbitai coderabbitai bot mentioned this pull request Nov 17, 2025
@coderabbitai coderabbitai bot mentioned this pull request Dec 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant