Conversation
- Removed compute mode handling from service setup in `init.py`. - Introduced `audio_utils.py` for audio processing functions, including `process_audio_chunk` and audio cropping utilities. - Updated `ClientManager` to be self-initializing and added atomic client creation and removal methods. - Refactored audio chunk processing in various controllers to utilize the new unified audio processing pipeline. - Enhanced transcription status update functionality in `database.py` to manage active transcript versions. - Cleaned up imports and improved logging across multiple files for better clarity and maintainability.
…rovider state in Memories component - Modified the CMD in the Dockerfile to include an extra argument for deepgram. - Simplified subprocess calls in integration tests by removing stdout and stderr captures. - Added memory provider state management in the Memories component, displaying the provider name conditionally in the UI.
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughCentralizes client lifecycle under ClientManager, unifies audio ingestion via process_audio_chunk, updates cropping to accept repository and speech segments, adjusts transcription API (no duration arg) with diarization gating, adds transcription status updates in DB, tightens sample-rate validation, modifies startup/shutdown flows, and minor Docker, memory UI, tests, and service tweaks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as Client (WS/File)
participant Main as FastAPI App
participant CM as ClientManager
participant AU as audio_utils.process_audio_chunk
participant PM as ProcessorManager
participant DB as AudioChunksRepository
Note over Client,Main: Ingestion (WS OMI/PCM or File Upload)
Client->>Main: Send audio bytes + metadata
Main->>CM: ensure/create client_state
Main->>AU: process_audio_chunk(audio_data, client_id, user/user_email, audio_format)
AU->>PM: queue AudioProcessingItem
PM->>PM: process (transcribe, crop, memory, etc.)
PM->>DB: update statuses (e.g., transcription status)
PM-->>Main: processing updates/events
sequenceDiagram
autonumber
participant Main as FastAPI Shutdown
participant CM as ClientManager
participant PM as ProcessorManager
participant TM as TaskManager
participant Mem as MemoryService
Note over Main,CM: Graceful shutdown
Main->>CM: get_all_client_ids()
loop for each client_id
Main->>CM: remove_client_with_cleanup(client_id)
end
Main->>PM: shutdown processors
Main->>TM: shutdown tasks
Main->>Mem: shutdown memory/speaker services
Main-->>Main: App terminated
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (12)
backends/advanced/Dockerfile (1)
43-43: Avoid--extraat runtime; rely on baked depsYou already
uv sync --extra deepgramduring build. Keeping--extra deepgraminCMDcan trigger resolution/network at startup and bypass lock semantics. Prefer running the app with the already-synced env.-CMD ["uv", "run", "--extra", "deepgram", "python3", "src/advanced_omi_backend/main.py"] +CMD ["uv", "run", "python3", "src/advanced_omi_backend/main.py"]If there’s a concrete need for runtime extras, note it in a comment and consider
UV_NO_SYNC=1to prevent implicit syncs.backends/advanced/src/advanced_omi_backend/memory/service_factory.py (1)
132-134: Don’t assume_initializedexists; usegetattrfor safetyDirect access can raise AttributeError if an implementation deviates from the base. Use a safe read.
- # All memory services should have _initialized attribute per the base class - info["service_initialized"] = _memory_service._initialized + # All memory services should expose `_initialized`; fall back safely if not + info["service_initialized"] = getattr(_memory_service, "_initialized", False)backends/advanced/webui/src/pages/Memories.tsx (2)
37-38: Don’t display “unknown” provider; use null/undefined to hideDefaulting to
'unknown'renders an unhelpful provider label. Keep it empty and render only when a real provider is known.- const [memoryProvider, setMemoryProvider] = useState<string>('') + const [memoryProvider, setMemoryProvider] = useState<string | null>(null) @@ - const provider = response.data.memory_provider || 'unknown' + const provider = response.data.memory_provider ?? null @@ - setMemoryProvider(provider) + setMemoryProvider(provider) @@ - setMemoryProvider('unknown') + setMemoryProvider(null)Also applies to: 45-49, 53-53
250-263: Hide provider line when not resolved; map known valuesRender the provider line only for known values to avoid “Provider: unknown”.
- {memoryProvider && ( + {memoryProvider && memoryProvider !== 'unknown' && ( <p className="text-xs text-gray-500 dark:text-gray-400 mt-0.5"> Provider: {memoryProvider === 'friend_lite' ? 'Friend-Lite' : memoryProvider === 'openmemory_mcp' ? 'OpenMemory MCP' : memoryProvider} </p> )}backends/advanced/tests/test_integration.py (2)
505-507: Add a timeout to the CI build step to prevent hangs
docker compose buildcan hang in CI. Other steps have timeouts; this one should too.- build_result = subprocess.run(["docker", "compose", "-f", "docker-compose-test.yml", "build"], env=env) + build_result = subprocess.run( + ["docker", "compose", "-f", "docker-compose-test.yml", "build"], + env=env, + timeout=900 # allow up to 15 minutes for cold CI builds + )Static analysis S607 about partial paths is acceptable here (test harness), but if needed we can guard with
shutil.which("docker").
522-523: OK to stream output; considercheck=Truefor clearer failureYou’ve removed capture to stream logs live (good). You can also let
subprocess.run(..., check=True)raise with context.- result = subprocess.run(cmd, env=env, timeout=300) + result = subprocess.run(cmd, env=env, timeout=300, check=False)If you switch to
check=True, remove the manualreturncodechecks below. Verify CI log visibility remains adequate with pytest’s-s.backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2)
1016-1016: Clean up unnecessary blank line.There's an unnecessary blank line after the import statement.
import yaml from advanced_omi_backend.memory_config_loader import get_config_loader -
1066-1066: Clean up unnecessary blank line.There's an unnecessary blank line after the import statement.
import yaml from advanced_omi_backend.memory_config_loader import MemoryConfigLoader -backends/advanced/src/advanced_omi_backend/client_manager.py (3)
38-47: Consider removing the legacy initialize method.Since the ClientManager is now self-initializing and the method only logs a warning when called with an external dictionary, consider marking this method as deprecated or removing it entirely if it's not needed for backward compatibility.
@deprecated("ClientManager is now self-initializing and doesn't need external initialization") def initialize(self, active_clients_dict: Optional[Dict[str, "ClientState"]] = None): """Legacy initialization method kept for backward compatibility.""" if active_clients_dict is not None: logger.warning("ClientManager no longer uses external dictionaries - ignoring active_clients_dict") logger.info("ClientManager initialization (legacy compatibility mode)")
160-160: Remove unused variable assignment.The
client_statevariable is assigned but never used in theremove_clientmethod.- # Get client state for cleanup - client_state = self._active_clients[client_id] - # Atomically remove from storage and deregister mapping del self._active_clients[client_id] unregister_client_user_mapping(client_id)
128-128: Consider using a custom exception class.Instead of raising a generic
ValueError, consider creating a custom exception class for better error handling and debugging.class ClientAlreadyExistsError(Exception): """Raised when attempting to create a client with an ID that already exists.""" pass # Then in the method: if client_id in self._active_clients: raise ClientAlreadyExistsError(f"Client {client_id} already exists")backends/advanced/src/advanced_omi_backend/transcription.py (1)
234-245: Diarization configuration loaded on every transcript request.Loading the diarization settings from a file on every transcript request could impact performance. Consider caching the configuration or loading it once during initialization.
# Check if we should request diarization based on configuration -config = load_diarization_settings_from_file() -diarization_source = config.get("diarization_source", "pyannote") +# Consider caching this at the class level if the config doesn't change frequently +if not hasattr(self, '_diarization_config'): + self._diarization_config = load_diarization_settings_from_file() +diarization_source = self._diarization_config.get("diarization_source", "pyannote")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
backends/advanced/Dockerfile(1 hunks)backends/advanced/src/advanced_omi_backend/audio_utils.py(2 hunks)backends/advanced/src/advanced_omi_backend/client.py(2 hunks)backends/advanced/src/advanced_omi_backend/client_manager.py(2 hunks)backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py(2 hunks)backends/advanced/src/advanced_omi_backend/controllers/system_controller.py(8 hunks)backends/advanced/src/advanced_omi_backend/database.py(1 hunks)backends/advanced/src/advanced_omi_backend/main.py(11 hunks)backends/advanced/src/advanced_omi_backend/memory/__init__.py(0 hunks)backends/advanced/src/advanced_omi_backend/memory/service_factory.py(2 hunks)backends/advanced/src/advanced_omi_backend/processors.py(5 hunks)backends/advanced/src/advanced_omi_backend/transcription.py(5 hunks)backends/advanced/tests/test_integration.py(2 hunks)backends/advanced/webui/src/pages/Memories.tsx(2 hunks)init.py(0 hunks)
💤 Files with no reviewable changes (2)
- backends/advanced/src/advanced_omi_backend/memory/init.py
- init.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-18T01:42:08.551Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.551Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
Applied to files:
backends/advanced/src/advanced_omi_backend/controllers/system_controller.pybackends/advanced/src/advanced_omi_backend/controllers/conversation_controller.pybackends/advanced/src/advanced_omi_backend/processors.py
🪛 Ruff (0.12.2)
backends/advanced/tests/test_integration.py
504-504: Starting a process with a partial executable path
(S607)
522-522: subprocess call: check for execution of untrusted input
(S603)
backends/advanced/src/advanced_omi_backend/client_manager.py
128-128: Avoid specifying long messages outside the exception class
(TRY003)
160-160: Local variable client_state is assigned to but never used
Remove assignment to unused variable client_state
(F841)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py
240-240: Abstract raise to an inner function
(TRY301)
backends/advanced/src/advanced_omi_backend/database.py
390-390: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
390-390: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
432-432: f-string without any placeholders
Remove extraneous f prefix
(F541)
433-433: f-string without any placeholders
Remove extraneous f prefix
(F541)
440-440: f-string without any placeholders
Remove extraneous f prefix
(F541)
443-443: f-string without any placeholders
Remove extraneous f prefix
(F541)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: integration-tests
🔇 Additional comments (42)
backends/advanced/src/advanced_omi_backend/client.py (2)
18-18: LGTM!The reorganization of imports looks good, placing all wyoming imports together for better organization.
68-68: LGTM!The log message change from "simplified client state" to "client state" is appropriate and makes the messaging consistent with the broader architectural changes.
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (7)
18-21: LGTM!The multi-line formatting of imports improves readability and makes it easier to track changes in version control.
25-25: LGTM! Unified audio processing is a good architectural improvement.Importing
process_audio_chunkfromaudio_utilscentralizes the audio processing logic and eliminates code duplication.
236-241: Verify that the 16kHz sample rate requirement is intended.The error handling is appropriate, but the hard requirement for 16kHz sample rate may limit usability. The error message mentions "I'll implement this at some point sorry" which suggests this is a known limitation.
Consider documenting this limitation in the API documentation and tracking the resampling feature implementation:
- Add a note in the API documentation that only 16kHz WAV files are currently supported
- Consider creating an issue to track the resampling feature implementation
- The error message could be more professional:
"Audio resampling is not yet supported. Please provide audio files with a 16kHz sample rate."
257-270: LGTM! Unified audio processing architecture.Excellent refactoring to use the centralized
process_audio_chunkfunction. The approach properly packages all audio metadata into theaudio_formatdict and correctly setsclient_state=Nonefor file uploads where no WebSocket connection exists.
624-637: LGTM! Consistent with the unified audio processing pattern.The implementation correctly uses the centralized
process_audio_chunkfunction with the same pattern as inprocess_audio_files.
897-900: LGTM!The multi-line formatting of the import statement improves readability.
935-938: LGTM!The multi-line formatting of the import statement maintains consistency with the previous speaker recognition import.
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (3)
12-14: LGTM! Import path updated correctly.The import has been correctly updated to reflect the relocation of
_process_audio_cropping_with_relative_timestampsfromaudio_cropping_utilstoaudio_utils.
273-280: LGTM! Improved error handling for cropping prerequisites.Good addition of validation to ensure speech segments exist before attempting to crop audio. The error response is clear and returns an appropriate 400 status code.
282-296: LGTM! Well-structured cropping implementation with database updates.The implementation correctly:
- Generates the output path for the cropped audio file
- Creates a repository instance for database updates
- Calls the relocated cropping function with all necessary parameters including the repository
The function signature change to include the repository parameter enables atomic database updates during the cropping process.
backends/advanced/src/advanced_omi_backend/client_manager.py (4)
29-36: LGTM! Self-initializing design simplifies the architecture.The ClientManager now acts as the single source of truth for client state without requiring external dictionary references. This design eliminates potential synchronization issues between multiple data structures.
107-142: LGTM! Atomic client creation improves concurrency safety.The
create_clientmethod provides atomic client creation and registration, which is essential for preventing race conditions in a concurrent environment. The method properly validates that the client doesn't already exist and atomically registers both the client state and user mapping.One potential enhancement to consider: Should we validate that the user exists before creating the client? This would prevent orphaned clients if an invalid user_id is provided.
169-194: LGTM! Comprehensive cleanup with disconnect.The
remove_client_with_cleanupmethod properly handles the full lifecycle by calling the client's disconnect method before removal. This ensures proper resource cleanup.
196-217: LGTM! Useful utility methods for client management.The addition of
get_all_client_idsandadd_existing_clientmethods provides necessary functionality for client enumeration and migration scenarios.backends/advanced/src/advanced_omi_backend/audio_utils.py (4)
8-24: LGTM! Well-structured imports and configuration.The use of
TYPE_CHECKINGfor avoiding circular imports is a best practice. The environment-configurable constants for speech processing provide good flexibility.
26-89: Excellent centralization of audio processing logic!The
process_audio_chunkfunction successfully unifies the audio ingestion pattern across all input sources (WebSocket, file upload). This eliminates code duplication and ensures consistent processing. The function properly handles:
- Audio format extraction with sensible defaults
- Timestamp generation when not provided
- Unified queuing through the processor manager
- Optional client state updates for WebSocket connections
This is a significant architectural improvement that makes the codebase more maintainable.
91-97: LGTM! Repository parameter enables atomic updates.The addition of the optional
chunk_repoparameter allows the cropping function to update the database atomically when needed, while remaining backward compatible when database updates aren't required.
162-163: LGTM! Conditional database update pattern.The conditional update only when
chunk_repo is not Noneprovides flexibility for different calling contexts - some may need database updates while others may not.backends/advanced/src/advanced_omi_backend/transcription.py (6)
10-14: LGTM! Clean import organization.The consolidation of multi-line imports for config functions and explicit AudioChunk import improves code clarity.
17-21: LGTM! Well-structured imports for processors.The processor imports are properly organized with clear separation of types and the manager function.
287-292: LGTM! Cleaner logging without DEBUG tags.The simplified logging format improves readability while maintaining essential debugging information.
512-512: Missing parameter inprocess_collected_audiocall.The call to
manager.process_collected_audio()on line 512 is correct (no duration parameter), confirming the signature change is properly implemented in the processor module.
177-178: Verified — callers updated to the new no-arg signature. All discovered usages call process_collected_audio() without arguments; no action required.
219-220: _get_transcript: internal callers verified — signature change is safe.
Only internal call found at backends/advanced/src/advanced_omi_backend/transcription.py:201 (await self._get_transcript()) and the definition at line 219; no other matches detected.backends/advanced/src/advanced_omi_backend/processors.py (6)
1169-1176: Repository parameter added to cropping function call.The addition of
self.repositoryas the 5th parameter to_process_audio_cropping_with_relative_timestampssuggests the function now needs database access during cropping operations. This is a good architectural change that allows the cropping function to update database records directly.
889-892: Excellent! Using conversation_id as the primary identifier.The memory processing flow now correctly uses
conversation_idinstead ofaudio_uuidthroughout, which aligns with the learning about using conversation_id for all reprocessing operations and version management. This is the correct architectural approach.Also applies to: 901-903, 907-908
512-512: LGTM! Process collected audio call updated.The call to
manager.process_collected_audio()correctly uses the new signature without the duration parameter.
139-140: Good addition: Duplicate close prevention.Adding
closing_clientsset to track clients currently being closed prevents race conditions and duplicate close operations. This is a solid defensive programming practice.
484-562: Well-designed close operation with proper cleanup.The
close_client_audiomethod properly handles:
- Duplicate close prevention
- ASR flushing before closing
- Proper error handling and logging
- Cleanup of resources
This is a robust implementation.
19-21: Resolved — function exists in audio_utils.py with expected signature. Found in backends/advanced/src/advanced_omi_backend/audio_utils.py (lines 91–94):async def _process_audio_cropping_with_relative_timestamps(original_path: str, speech_segments: list[tuple[float, float]], output_path: str, ...).backends/advanced/src/advanced_omi_backend/main.py (10)
45-46: Good architectural improvements with centralized management.The imports show a shift to centralized management with the new
ClientManagerandTaskManager, plus the unifiedprocess_audio_chunkfunction. This improves code organization and maintainability.Also applies to: 52-53
227-229: LGTM! ClientManager initialization.The ClientManager is properly initialized as a singleton, providing centralized client state management.
254-257: Excellent use of ClientManager for atomic operations.Using
client_manager.create_client()provides atomic client creation and registration, which is much cleaner than managing state in multiple places.
272-282: Robust cleanup with proper error handling.The cleanup function properly uses
remove_client_with_cleanup()and handles the return value to determine if additional cleanup is needed. Good defensive programming.
589-601: LGTM! Unified audio processing pipeline.The replacement of direct AudioChunk construction and queuing with
process_audio_chunk()unifies the audio ingestion path. This is a significant architectural improvement that reduces code duplication.
823-830: Consistent use of unified pipeline for PCM audio.The PCM WebSocket handler also uses
process_audio_chunk(), maintaining consistency across different audio formats. Good architectural decision.
853-865: Good handling of raw audio with sensible defaults.The code properly handles raw audio chunks by assuming standard PCM format (16kHz, 16-bit, mono) when no format information is available. This provides backward compatibility.
973-973: Client count now uses ClientManager.The health endpoint properly uses
client_manager.get_client_count()instead of the oldlen(active_clients)approach. This ensures accurate client counting.
331-331: Proper shutdown sequence with ClientManager.The shutdown flow correctly iterates over clients using
client_manager.get_all_client_ids()and properly shuts down the task manager. The sequence ensures clean resource cleanup.Also applies to: 340-342
736-749: Proactive transcription manager creation on audio-start.Creating the transcription manager early when audio-start is received (rather than waiting for the first chunk) is a good optimization that reduces latency for the first transcription.
| async def update_transcription_status( | ||
| self, audio_uuid: str, status: str, error_message: str = None, provider: str = None | ||
| ): | ||
| """Update transcription processing status and completion timestamp. | ||
|
|
||
| Interface compatibility method - updates active transcript version. | ||
| """ | ||
| chunk = await self.get_chunk(audio_uuid) | ||
| if not chunk: | ||
| return False | ||
|
|
||
| active_version = chunk.get("active_transcript_version") | ||
| if not active_version: | ||
| # Create initial transcript version if none exists | ||
| version_id = str(uuid.uuid4()) | ||
| version_data = { | ||
| "version_id": version_id, | ||
| "transcript": "", | ||
| "segments": [], | ||
| "status": status, | ||
| "provider": provider, | ||
| "created_at": datetime.now(UTC).isoformat(), | ||
| "processing_run_id": None, | ||
| "raw_data": {}, | ||
| "speakers_identified": [] | ||
| } | ||
| if error_message: | ||
| version_data["error_message"] = error_message | ||
|
|
||
| result = await self.col.update_one( | ||
| {"audio_uuid": audio_uuid}, | ||
| { | ||
| "$push": {"transcript_versions": version_data}, | ||
| "$set": { | ||
| "active_transcript_version": version_id, | ||
| "transcription_status": status, | ||
| "transcription_updated_at": datetime.now(UTC).isoformat(), | ||
| } | ||
| } | ||
| ) | ||
| else: | ||
| # Update existing active version | ||
| update_doc = { | ||
| f"transcript_versions.$[version].status": status, | ||
| f"transcript_versions.$[version].updated_at": datetime.now(UTC), | ||
| "transcription_status": status, | ||
| "transcription_updated_at": datetime.now(UTC).isoformat(), | ||
| } | ||
| if status == "COMPLETED": | ||
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | ||
| if error_message: | ||
| update_doc[f"transcript_versions.$[version].error_message"] = error_message | ||
| update_doc["transcription_error"] = error_message | ||
| if provider: | ||
| update_doc[f"transcript_versions.$[version].provider"] = provider | ||
| update_doc["transcript_provider"] = provider | ||
|
|
||
| result = await self.col.update_one( | ||
| {"audio_uuid": audio_uuid}, | ||
| {"$set": update_doc}, | ||
| array_filters=[{"version.version_id": active_version}] | ||
| ) | ||
|
|
||
| if result.modified_count > 0: | ||
| logger.info(f"Updated transcription status to {status} for {audio_uuid}") | ||
| return result.modified_count > 0 | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
🧩 Analysis chain
Fix Optional typing and extraneous f-strings; set completion timestamp on first-version path
- Use
Optional[str]forerror_messageandprovider(RUF013). - Remove
fprefixes where there are no placeholders (F541). - When creating the initial transcript version, also set
transcription_completed_atif status isCOMPLETED(parity with update path).
- async def update_transcription_status(
- self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
- ):
+ async def update_transcription_status(
+ self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None
+ ):
@@
- result = await self.col.update_one(
+ update_set = {
+ "active_transcript_version": version_id,
+ "transcription_status": status,
+ "transcription_updated_at": datetime.now(UTC).isoformat(),
+ }
+ if status == "COMPLETED":
+ update_set["transcription_completed_at"] = datetime.now(UTC).isoformat()
+
+ result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{
"$push": {"transcript_versions": version_data},
- "$set": {
- "active_transcript_version": version_id,
- "transcription_status": status,
- "transcription_updated_at": datetime.now(UTC).isoformat(),
- }
+ "$set": update_set
}
)
@@
- update_doc = {
- f"transcript_versions.$[version].status": status,
- f"transcript_versions.$[version].updated_at": datetime.now(UTC),
+ update_doc = {
+ "transcript_versions.$[version].status": status,
+ "transcript_versions.$[version].updated_at": datetime.now(UTC),
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
@@
- update_doc[f"transcript_versions.$[version].error_message"] = error_message
+ update_doc["transcript_versions.$[version].error_message"] = error_message
update_doc["transcription_error"] = error_message
if provider:
- update_doc[f"transcript_versions.$[version].provider"] = provider
+ update_doc["transcript_versions.$[version].provider"] = provider
update_doc["transcript_provider"] = providerOptionally, standardize timestamp types (all isoformat vs datetime objects) for consistency across fields.
To ensure we didn’t accidentally introduce duplicate definitions of this method in the file:
🏁 Script executed:
#!/bin/bash
# Verify there is only one definition of update_transcription_status in the repo
rg -nP '^\s*async\s+def\s+update_transcription_status\b' -C2Length of output: 478
Fix Optional typing, remove extraneous f-strings, and set completion timestamp on initial-version path
Use Optional[str] for optional params, remove unnecessary f-prefixes on literal dict keys, and when creating the initial transcript version set transcription_completed_at if status == "COMPLETED". Add from typing import Optional to imports if missing.
- async def update_transcription_status(
- self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
- ):
+ async def update_transcription_status(
+ self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None
+ ):
@@
- result = await self.col.update_one(
+ update_set = {
+ "active_transcript_version": version_id,
+ "transcription_status": status,
+ "transcription_updated_at": datetime.now(UTC).isoformat(),
+ }
+ if status == "COMPLETED":
+ update_set["transcription_completed_at"] = datetime.now(UTC).isoformat()
+
+ result = await self.col.update_one(
{"audio_uuid": audio_uuid},
{
"$push": {"transcript_versions": version_data},
- "$set": {
- "active_transcript_version": version_id,
- "transcription_status": status,
- "transcription_updated_at": datetime.now(UTC).isoformat(),
- }
+ "$set": update_set
}
)
@@
- update_doc = {
- f"transcript_versions.$[version].status": status,
- f"transcript_versions.$[version].updated_at": datetime.now(UTC),
+ update_doc = {
+ "transcript_versions.$[version].status": status,
+ "transcript_versions.$[version].updated_at": datetime.now(UTC),
"transcription_status": status,
"transcription_updated_at": datetime.now(UTC).isoformat(),
}
@@
- update_doc[f"transcript_versions.$[version].error_message"] = error_message
+ update_doc["transcript_versions.$[version].error_message"] = error_message
update_doc["transcription_error"] = error_message
if provider:
- update_doc[f"transcript_versions.$[version].provider"] = provider
+ update_doc["transcript_versions.$[version].provider"] = provider
update_doc["transcript_provider"] = provider📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def update_transcription_status( | |
| self, audio_uuid: str, status: str, error_message: str = None, provider: str = None | |
| ): | |
| """Update transcription processing status and completion timestamp. | |
| Interface compatibility method - updates active transcript version. | |
| """ | |
| chunk = await self.get_chunk(audio_uuid) | |
| if not chunk: | |
| return False | |
| active_version = chunk.get("active_transcript_version") | |
| if not active_version: | |
| # Create initial transcript version if none exists | |
| version_id = str(uuid.uuid4()) | |
| version_data = { | |
| "version_id": version_id, | |
| "transcript": "", | |
| "segments": [], | |
| "status": status, | |
| "provider": provider, | |
| "created_at": datetime.now(UTC).isoformat(), | |
| "processing_run_id": None, | |
| "raw_data": {}, | |
| "speakers_identified": [] | |
| } | |
| if error_message: | |
| version_data["error_message"] = error_message | |
| result = await self.col.update_one( | |
| {"audio_uuid": audio_uuid}, | |
| { | |
| "$push": {"transcript_versions": version_data}, | |
| "$set": { | |
| "active_transcript_version": version_id, | |
| "transcription_status": status, | |
| "transcription_updated_at": datetime.now(UTC).isoformat(), | |
| } | |
| } | |
| ) | |
| else: | |
| # Update existing active version | |
| update_doc = { | |
| f"transcript_versions.$[version].status": status, | |
| f"transcript_versions.$[version].updated_at": datetime.now(UTC), | |
| "transcription_status": status, | |
| "transcription_updated_at": datetime.now(UTC).isoformat(), | |
| } | |
| if status == "COMPLETED": | |
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | |
| if error_message: | |
| update_doc[f"transcript_versions.$[version].error_message"] = error_message | |
| update_doc["transcription_error"] = error_message | |
| if provider: | |
| update_doc[f"transcript_versions.$[version].provider"] = provider | |
| update_doc["transcript_provider"] = provider | |
| result = await self.col.update_one( | |
| {"audio_uuid": audio_uuid}, | |
| {"$set": update_doc}, | |
| array_filters=[{"version.version_id": active_version}] | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Updated transcription status to {status} for {audio_uuid}") | |
| return result.modified_count > 0 | |
| async def update_transcription_status( | |
| self, audio_uuid: str, status: str, error_message: Optional[str] = None, provider: Optional[str] = None | |
| ): | |
| """Update transcription processing status and completion timestamp. | |
| Interface compatibility method - updates active transcript version. | |
| """ | |
| chunk = await self.get_chunk(audio_uuid) | |
| if not chunk: | |
| return False | |
| active_version = chunk.get("active_transcript_version") | |
| if not active_version: | |
| # Create initial transcript version if none exists | |
| version_id = str(uuid.uuid4()) | |
| version_data = { | |
| "version_id": version_id, | |
| "transcript": "", | |
| "segments": [], | |
| "status": status, | |
| "provider": provider, | |
| "created_at": datetime.now(UTC).isoformat(), | |
| "processing_run_id": None, | |
| "raw_data": {}, | |
| "speakers_identified": [] | |
| } | |
| if error_message: | |
| version_data["error_message"] = error_message | |
| update_set = { | |
| "active_transcript_version": version_id, | |
| "transcription_status": status, | |
| "transcription_updated_at": datetime.now(UTC).isoformat(), | |
| } | |
| if status == "COMPLETED": | |
| update_set["transcription_completed_at"] = datetime.now(UTC).isoformat() | |
| result = await self.col.update_one( | |
| {"audio_uuid": audio_uuid}, | |
| { | |
| "$push": {"transcript_versions": version_data}, | |
| "$set": update_set | |
| } | |
| ) | |
| else: | |
| # Update existing active version | |
| update_doc = { | |
| "transcript_versions.$[version].status": status, | |
| "transcript_versions.$[version].updated_at": datetime.now(UTC), | |
| "transcription_status": status, | |
| "transcription_updated_at": datetime.now(UTC).isoformat(), | |
| } | |
| if status == "COMPLETED": | |
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | |
| if error_message: | |
| update_doc["transcript_versions.$[version].error_message"] = error_message | |
| update_doc["transcription_error"] = error_message | |
| if provider: | |
| update_doc["transcript_versions.$[version].provider"] = provider | |
| update_doc["transcript_provider"] = provider | |
| result = await self.col.update_one( | |
| {"audio_uuid": audio_uuid}, | |
| {"$set": update_doc}, | |
| array_filters=[{"version.version_id": active_version}] | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Updated transcription status to {status} for {audio_uuid}") | |
| return result.modified_count > 0 |
🧰 Tools
🪛 Ruff (0.12.2)
390-390: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
390-390: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
432-432: f-string without any placeholders
Remove extraneous f prefix
(F541)
433-433: f-string without any placeholders
Remove extraneous f prefix
(F541)
440-440: f-string without any placeholders
Remove extraneous f prefix
(F541)
443-443: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/database.py around lines 389-455:
the method signature should use Optional[str] for error_message and provider and
you must add "from typing import Optional" to imports; in the initial-version
branch remove extraneous f-strings from literal dict keys (use plain string keys
like "transcription_completed_at") and when status == "COMPLETED" set
transcription_completed_at on the document (using datetime.now(UTC).isoformat()
to match other timestamps); keep the rest of the update logic unchanged and
ensure the updated keys/types mirror the existing update branch.
| # Don't initialize here - let it happen lazily on first use | ||
| # This prevents orphaned tasks that cause "Task was destroyed but it is pending" errors | ||
| memory_logger.debug(f"Memory service created but not initialized: {type(_memory_service).__name__}") | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
🧩 Analysis chain
Lazy init is good, but guard singleton creation and fix docstring
- Possible race: two callers could concurrently create separate instances before assignment. Add a module-level lock with double-checked creation.
- Docstring still claims “Initialized” instance; adjust to reflect lazy semantics.
- Returns:
- Initialized memory service instance
+ Returns:
+ Memory service singleton (may be uninitialized; implementations should lazy‑init on first use)
@@
- if _memory_service is None:
+ if _memory_service is None:
try:
# Build configuration from environment
config = build_memory_config_from_env()
@@
- _memory_service = create_memory_service(config)
+ # Double-checked locking to avoid duplicate creation under concurrency
+ import threading # local import if you prefer to avoid top-level dependency
+ _lock = globals().setdefault("_memory_service_lock", threading.Lock())
+ with _lock:
+ if _memory_service is None:
+ _memory_service = create_memory_service(config)Outside this hunk, add (once at module top):
import threading
_memory_service_lock = threading.Lock()Would you like me to generate a quick search script to check all call sites expect lazy init?
🏁 Script executed:
#!/bin/bash
# First, let's examine the current implementation of the memory service factory
cat backends/advanced/src/advanced_omi_backend/memory/service_factory.pyLength of output: 5020
🏁 Script executed:
#!/bin/bash
# Check if there are any existing threading imports or locks in the file
rg -n "threading|Lock|lock" backends/advanced/src/advanced_omi_backend/memory/service_factory.pyLength of output: 0
🏁 Script executed:
#!/bin/bash
# Look for all call sites of get_memory_service to understand usage patterns
rg -n -C3 "get_memory_service" --type=pyLength of output: 19605
🏁 Script executed:
#!/bin/bash
# Check if this is actually used in a multi-threaded context
rg -n -C3 "threading|Thread|concurrent|asyncio" backends/advanced/src/advanced_omi_backend/ --type=pyLength of output: 79512
🏁 Script executed:
#!/bin/bash
# Look for existing singleton patterns or locks in the codebase
rg -n -C2 "_lock|threading\.Lock|singleton" backends/advanced/src/advanced_omi_backend/ --type=pyLength of output: 9206
Guard singleton creation and fix docstring
- File: backends/advanced/src/advanced_omi_backend/memory/service_factory.py — add a module-level threading.Lock and apply double-checked locking in get_memory_service; update the Returns docstring to reflect lazy-init.
- Returns:
- Initialized memory service instance
+ Returns:
+ Memory service singleton (may be uninitialized; implementations should lazy‑init on first use)
@@
- if _memory_service is None:
+ if _memory_service is None:
try:
# Build configuration from environment
config = build_memory_config_from_env()
@@
- _memory_service = create_memory_service(config)
+ # Double-checked locking to avoid duplicate creation under concurrency
+ import threading # local import if you prefer to avoid top-level dependency
+ _lock = globals().setdefault("_memory_service_lock", threading.Lock())
+ with _lock:
+ if _memory_service is None:
+ _memory_service = create_memory_service(config)Outside this hunk, add (once at module top):
import threading
_memory_service_lock = threading.Lock()🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/memory/service_factory.py around
lines 79 to 82, the singleton memory service creation is not guarded for
concurrent access and the Returns docstring is inaccurate; add a module-level
threading.Lock (import threading and create _memory_service_lock at top of
module) and apply double-checked locking inside get_memory_service so the
function checks if _memory_service is set, acquires the lock only if needed,
re-checks and initializes lazily under the lock, then releases it; also update
the Returns section of the function docstring to state that the MemoryService is
created lazily on first use and may return an already-initialized singleton.
…ry service - Updated the `update_transcription_status` method in `database.py` to use `Optional` for error message and provider parameters, and refactored the update logic for better clarity. - Implemented double-checked locking in `get_memory_service` to ensure thread-safe singleton creation in `service_factory.py`, preventing potential race conditions.
Summary by CodeRabbit