Queue combo#121
Queue combo#121AnkushMalaker merged 16 commits intoSimpleOpenSoftware:mainfrom thestumonkey:queue-combo
Conversation
Added version selection from the conversation page, delete version and the queuemanagement page # Conflicts: # backends/advanced/src/advanced_omi_backend/main.py
commit 3ba5506 Merge: dbf37c7 8ffd97a Author: Stuart Alexander <stu@theawesome.co.uk> Date: Wed Sep 24 20:03:29 2025 +0100 Merge branch 'AnkushMalaker:main' into pytest commit 8ffd97a Author: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Wed Sep 24 09:25:11 2025 +0530 fix faiss issue in speaker recognition cpu commit dbf37c7 Author: stu alexander <thestumonkey@gmail.com> Date: Sat Sep 20 19:02:22 2025 +0100 Fixed update user commit 14845a6 Author: stu alexander <thestumonkey@gmail.com> Date: Sat Sep 20 19:02:03 2025 +0100 Added robot test suite
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe PR introduces RQ-based background processing and Beanie models for conversations and audio sessions, adds audio upload and queue monitoring APIs, refactors controllers and processors to enqueue jobs, enhances health checks with Redis/RQ, adds startup scripts, updates Makefile and configs, expands web UI for queue/versioning, and adds extensive Robot tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Admin as Admin (API)
participant FE as Web UI
participant API as FastAPI (/api/audio/upload)
participant AC as audio_controller
participant FS as Filesystem (CHUNK_DIR)
participant RQ as RQ Enqueue
participant W as RQ Worker
participant DB as Beanie (AudioSession, Conversation)
Admin->>FE: Upload WAV files
FE->>API: POST /api/audio/upload (files, device_name)
API->>AC: Delegate upload_and_process_audio_files
AC->>AC: Validate files, WAV, sample rate
AC->>FS: Save processed mono 16k WAVs
AC->>RQ: enqueue_audio_processing(client_id, audio_meta, path)
RQ-->>API: Return Job IDs
API-->>FE: JSON {files[], enqueued_count, client_id, job_ids}
Note over W,DB: Worker execution
RQ->>W: process_audio_job
W->>DB: Create AudioSession / link conversation (if any)
W->>RQ: enqueue_initial_transcription(...)
sequenceDiagram
autonumber
actor User as User
participant FE as Web UI (Conversations)
participant API as FastAPI (/api/conversations/{id})
participant CC as conversation_controller
participant DB as Beanie (Conversation)
participant RQ as RQ Enqueue
participant W as RQ Worker
User->>FE: Open conversation detail
FE->>API: GET /api/conversations/{id}
API->>CC: get_conversation
CC->>DB: Load Conversation, active versions, segments
DB-->>CC: Conversation + version_info
CC-->>FE: Details + version_info
User->>FE: Reprocess transcript
FE->>API: POST /api/conversations/{id}/reprocess/transcript
API->>RQ: enqueue_transcript_processing(conversation_id, new_version_id)
RQ->>W: process_transcript_job
W->>DB: Add TranscriptVersion, set active
W-->>FE: Status via queue endpoints
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 56
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
backends/advanced/src/advanced_omi_backend/processors.py.backup (1)
906-1270: Remove the stray legacy blockStarting at Line 906 there’s a legacy
_memory_processorfragment sitting at module scope with a leading indent. If Python ever tries to import this.backupmodule (e.g.,python -m compileall), it fails immediately withIndentationError: unexpected indent, and the block references attributes (memory_queue,track_processing_task) that no longer exist. Please delete this trailing block or drop the.backupmodule entirely so the tree remains syntactically valid.backends/advanced/src/advanced_omi_backend/memory/utils.py (1)
74-83: Regex now matches literal “\s” instead of whitespace; JSON extraction will fail.In a raw string,
\\smatches a backslash +s. Use\sto match whitespace. Same for the array colon segment. Fix both patterns.- r'\{"memory"\\s*:\\s*\[.*?\]\\s*\}', + r'\{"memory"\s*:\s*\[.*?\]\s*\}', - r'\{"facts"\\s*:\\s*\[.*?\]\\s*\}', + r'\{"facts"\s*:\s*\[.*?\]\s*\}',tests/integration/mobile_client_tests.robot (1)
1-51: Add consistent "todo" tags and consider excluding from CI.This placeholder test suite will cause CI failures since all tests explicitly fail. Consider:
- Add "todo" tag to all tests (currently only the first test has it)
- Exclude from CI until implementation begins
Apply this diff to add consistent "todo" tags:
Login to server [Documentation] Test logging in to the server from mobile client + [Tags] debug todo Log Logging in to server INFO Fail Test not written yet - placeholder test Scan bluetooth devices [Documentation] Scan for available bluetooth devices + [Tags] debug todo Log Scanning bluetooth devices INFO Fail Test not written yet - placeholder test Filter devices by omi [Documentation] Filter scanned devices by omi + [Tags] debug todo Log Filtering devices by omi INFO Fail Test not written yet - placeholder test Connect to bluetooth device [Documentation] Connect to a bluetooth device + [Tags] debug todo Log Connecting to bluetooth device INFO Fail Test not written yet - placeholder test Get device codec [Documentation] Get the codec information from the device + [Tags] debug todo Log Getting device codec INFO Fail Test not written yet - placeholder test Get device battery level [Documentation] Get the battery level from the device + [Tags] debug todo Log Getting device battery level INFO Fail Test not written yet - placeholder test Start audio stream [Documentation] Start streaming audio from the device + [Tags] debug todo Log Starting audio stream INFO Fail Test not written yet - placeholder testThen update your Robot test runner to exclude "todo" tests:
robot --exclude todo tests/tests/integration/conversation_queue.robot (1)
100-132: Use evaluated variables when checking job statusExpressions like
Should Be True '${job_details}[status]' in [...]and'${status}' in [...]treat the left side as a literal string, so the checks always fail once the earlier syntax issues are fixed. Switch to the$variablesyntax so Robot evaluates the actual status values. (robotframework.org)- Should Be True '${job_details}[status]' in ['completed', 'finished'] Job status: ${job_details}[status], expected completed or finished + Should Be True $job_details['status'] in ['completed', 'finished'] Job status: ${job_details}[status], expected completed or finished ... - Should Be True '${status}' in ['completed', 'finished', 'failed'] Job status: ${status} + Should Be True $status in ['completed', 'finished', 'failed'] Job status: ${status}Makefile (2)
8-11: Fix env export filters:\sisn’t valid in grep here; comments/blank lines aren’t excluded.On many systems
grep(basic regex) treats\sas literals, so comment/blank lines slip through andexportmay try to export#...as a var name. Use a single robust sed to extract only valid identifiers.Apply this diff in both blocks:
- export $(shell sed 's/=.*//' .env | grep -v '^\s*$$' | grep -v '^\s*\#') + # Export only VAR names that look like shell identifiers (skip blanks/comments) + export $(shell sed -n 's/^[[:space:]]*\([A-Za-z_][A-Za-z0-9_]*\)[[:space:]]*=.*/\1/p' .env)-export $(shell sed 's/=.*//' config.env | grep -v '^\s*$$' | grep -v '^\s*\#') +# Export only valid identifiers from config.env +export $(shell sed -n 's/^[[:space:]]*\([A-Za-z_][A-Za-z0-9_]*\)[[:space:]]*=.*/\1/p' config.env)Also applies to: 14-17
159-159: Use POSIX ‘.’ instead of Bash-only ‘source’.
makeruns/bin/shby default;sourcemay fail. Use the POSIX dot.- @set -a; source skaffold.env; set +a; skaffold run --profile=infrastructure --default-repo=$(CONTAINER_REGISTRY) + @set -a; . skaffold.env; set +a; skaffold run --profile=infrastructure --default-repo=$(CONTAINER_REGISTRY)- @set -a; source skaffold.env; set +a; skaffold run --profile=advanced-backend --default-repo=$(CONTAINER_REGISTRY) + @set -a; . skaffold.env; set +a; skaffold run --profile=advanced-backend --default-repo=$(CONTAINER_REGISTRY)(and same change in deploy-apps.)
Also applies to: 242-242, 255-255
backends/advanced/src/advanced_omi_backend/transcription.py (1)
886-904: Undefined variableconversation(useconversation_model).This crashes the queueing path and breaks memory processing. Replace all
conversation[...]uses withconversation_modelattributes.Based on learnings
- logger.info( - f"💭 Queuing memory processing for conversation {conversation_id} (audio: {conversation['audio_uuid']})" - ) + logger.info( + f"💭 Queuing memory processing for conversation {conversation_id} (audio: {conversation_model.audio_uuid})" + ) @@ - enqueue_memory_processing( + enqueue_memory_processing( client_id=self._client_id, - user_id=conversation["user_id"], + user_id=conversation_model.user_id, user_email=audio_session["user_email"], conversation_id=conversation_id )backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (1)
115-131: Fix broken queue stats after RQ migration
ProcessorManagerno longer exposesmemory_queue,cropping_queue, orprocessing_tasks, so this call now raises anAttributeErrorevery time/system/processor-statushits it. We need to pull queue sizes from the new RQ queues instead of the removed in-process queues, and count tracked jobs viaprocessing_jobs.@@ -from advanced_omi_backend.processors import get_processor_manager +from advanced_omi_backend.processors import get_processor_manager +from advanced_omi_backend.rq_queue import DEFAULT_QUEUE, MEMORY_QUEUE, get_queue @@ - status = { + memory_queue = get_queue(MEMORY_QUEUE) + default_queue = get_queue(DEFAULT_QUEUE) + + status = { "queues": { "audio_queue": processor_manager.audio_queue.qsize(), "transcription_queue": processor_manager.transcription_queue.qsize(), - "memory_queue": processor_manager.memory_queue.qsize(), - "cropping_queue": processor_manager.cropping_queue.qsize(), + "memory_queue": len(memory_queue.jobs), + "cropping_queue": sum( + 1 + for job in default_queue.jobs + if getattr(job, "id", "") and job.id.startswith("cropping_") + ), }, "processors": { "audio_processor": "running", "transcription_processor": "running", - "memory_processor": "running", - "cropping_processor": "running", + "memory_processor": "running", + "cropping_processor": "running", }, "active_clients": len(processor_manager.active_file_sinks), "active_audio_uuids": len(processor_manager.active_audio_uuids), - "processing_tasks": len(processor_manager.processing_tasks), + "processing_tasks": len(processor_manager.processing_jobs), "timestamp": int(time.time()), }
🧹 Nitpick comments (34)
backends/advanced/webui/src/pages/System.tsx (1)
279-284: Consider extending the service status interface for type safety.The conditional rendering of Redis worker stats uses
(status as any)to access properties (worker_count,active_workers,idle_workers) that aren't defined in theHealthDatainterface (lines 7-14). While the runtime check!== undefinedprovides safety, TypeScript loses compile-time guarantees.Consider extending the interface to include these optional properties:
interface HealthData { status: 'healthy' | 'partial' | 'unhealthy' services: Record<string, { healthy: boolean message?: string + status?: string + provider?: string + worker_count?: number + active_workers?: number + idle_workers?: number }> timestamp?: string }This would enable autocomplete, prevent property name typos, and provide better IDE support without changing runtime behavior.
backends/advanced/src/advanced_omi_backend/transcription_providers.py (1)
833-856: Unreachable code at lines 855-856.The
elseclause at lines 855-856 is unreachable because:
- Lines 806-832 handle all non-
Noneprovider_namecases (returning a provider or raising an exception).- Lines 835-854 handle the
provider_name is Nonecase and always return a value (either a provider instance orNoneat line 854).Apply this diff to remove the unreachable code:
logger.warning("No transcription provider configured (DEEPGRAM_API_KEY or PARAKEET_ASR_URL required)") return None - else: - return Nonetests/resources/timing_keywords.robot (2)
20-39: Add error handling for missing start time.Line 28 will raise an error if
Start Timerwas never called for the operation. Consider adding a default value or validation to provide a clearer error message.Apply this diff to add a default value and validation:
- ${start_time}= Get From Dictionary ${TIMING_DATA} ${start_key} + ${start_time}= Get From Dictionary ${TIMING_DATA} ${start_key} default=None + IF ${start_time} == None + Fail Timer '${operation_name}' was never started + END
41-47: Add error handling for missing duration.Line 46 will raise an error if the timer was never stopped. Consider adding a default value or validation.
Apply this diff:
- ${duration}= Get From Dictionary ${TIMING_DATA} ${duration_key} + ${duration}= Get From Dictionary ${TIMING_DATA} ${duration_key} default=None + IF ${duration} == None + Fail Timer '${operation_name}' was never stopped or doesn't exist + ENDrequirements.txt (1)
1-5: Recommend pinning dependency versions for reproducibility.The Robot Framework and python-dotenv dependencies are unpinned, which can lead to non-reproducible builds and unexpected test failures when upstream packages release breaking changes.
For example, python-dotenv introduced breaking changes in v15+ around comment parsing and multiline handling (Based on learnings). Similarly, Robot Framework libraries may introduce incompatible changes.
Consider pinning versions to ensure consistent test behavior:
-robotframework -robotframework-tidy -robotframework-requests -robotframework-browser -python-dotenv +robotframework==7.0 +robotframework-tidy==4.0.0 +robotframework-requests==0.9.6 +robotframework-browser==18.0.0 +python-dotenv==1.0.0Alternatively, generate a lock file using
pip freeze > requirements-lock.txtafter installing to capture exact versions.tests/browser/browser_auth.robot (1)
23-23: Consider making timeout configurable.The hard-coded 10-second timeout might be insufficient in slow CI environments or with network latency. Consider extracting timeouts to variables or suite-level configuration for better flexibility.
Example refactor in the Variables section:
*** Variables *** ${LOGIN_TIMEOUT} 10s ${DEFAULT_TIMEOUT} 10s *** Test Cases *** Test Browser Can Access Login Page ... Wait For Elements State id=email visible timeout=${LOGIN_TIMEOUT} ... Wait For Elements State text=Friend-Lite Dashboard visible timeout=${DEFAULT_TIMEOUT}backends/advanced/Dockerfile (1)
42-47: start.sh verified with basic error handling; consider addingset -uandset -o pipefail
- File
backends/advanced/start.shexists and includesset -e.- Script correctly orchestrates both RQ workers and the FastAPI backend.
- For more robust failure detection, add
set -u(treat unset variables as errors) andset -o pipefail(catch pipeline failures).backends/advanced/src/advanced_omi_backend/models/job.py (1)
11-23: Update or remove stale timeout docstring inJobPriorityenum
Timeout values are defined per-job inrq_queue.pyvia multipletimeout_mappingdicts (e.g., transcription uses 600/480/300/180 s, audio uses 120/90/60/30 s, cropping uses 300/240/180/120 s, etc), but the enum’s docstring only lists the transcription defaults (10 min, 8 min, 5 min, 3 min). This duplication is prone to drift—extract the mappings to a single source of truth or sync/delete the enum docstring.backends/advanced/src/advanced_omi_backend/controllers/user_controller.py (2)
83-83: Drop print; use logger.debug.Replace the debug print with structured logging.
- print("DEBUG: New update_user function is being called!") + logger.debug("update_user called")
95-101: Prefer manager-based existence check for consistency.Use the user manager to fetch by id (and centralize constraints), instead of mixing raw
users_colwith the manager.Example:
existing_user = await user_manager.get(object_id) if not existing_user: return JSONResponse(status_code=404, content={"message": f"User {user_id} not found"})Also ensure
UserUpdatecannot carry privileged fields (e.g., is_superuser) through this path. Please confirm the schema.tests/all_api_tests.robot (1)
45-53: Ensure ${API_URL} is defined or guard it.Without
Variables tests/test_env.pyor CLI-V,${API_URL}will be undefined. Add a Variables import (above) or fallback from environment:Master Suite Setup [Documentation] Setup for the entire test suite Log Starting Friend-Lite API Test Suite - Log Testing against: ${API_URL} + ${api_url}= Set Variable If '${API_URL}'!='${API_URL}' ${API_URL} %{API_URL} + Log Testing against: ${api_url}Confirm your CI invokes Robot with
-V tests/test_env.pyor setsAPI_URLenv.tests/endpoints/auth_tests.robot (1)
90-90: Use Robot Framework's native type checking instead of Python's isinstance.The
isinstance()call may not evaluate correctly in Robot Framework's expression context. Consider using Robot Framework's built-in keywords for type validation.Apply this diff to use Robot Framework's native approach:
- Should Be True isinstance($response.json(), list) + ${users}= Set Variable ${response.json()} + # Verify it's a list by checking length exists + ${length}= Get Length ${users} + Should Be True ${length} >= 0Or simply verify the list by iterating (which implicitly validates it's iterable):
- Should Be True isinstance($response.json(), list) + # Implicit list validation - Get Length will fail if not a list + ${users}= Set Variable ${response.json()} + ${length}= Get Length ${users}backends/advanced/start.sh (3)
10-20: Improve shutdown reliability by killing tracked PIDs explicitly.The
pkill -TERM -P $$approach may not reliably kill background jobs in all scenarios. Since you're already capturingWORKER_PIDandBACKEND_PID, use them explicitly in the shutdown handler.Apply this diff to improve shutdown handling:
shutdown() { echo "🛑 Shutting down services..." - pkill -TERM -P $$ + kill -TERM $WORKER_PID $BACKEND_PID 2>/dev/null || true wait echo "✅ All services stopped" exit 0 }
22-28: Consider health checks instead of arbitrary sleep for worker readiness.The
sleep 2assumes workers will be ready in 2 seconds, but there's no verification. Consider adding a Redis connection check or RQ worker ping to confirm readiness.Example health check approach:
# Start RQ workers in the background echo "🔧 Starting RQ workers..." uv run --extra deepgram rq worker transcription memory default --url "${REDIS_URL:-redis://localhost:6379/0}" & WORKER_PID=$! # Wait for Redis connection instead of arbitrary sleep echo "⏳ Waiting for workers to be ready..." for i in {1..30}; do if redis-cli -u "${REDIS_URL:-redis://localhost:6379/0}" ping &>/dev/null; then echo "✅ Redis connection confirmed" break fi sleep 1 done
30-44: Consider log separation for easier debugging.Both the RQ workers and FastAPI backend write to the same stdout/stderr, which can make debugging difficult. Consider redirecting their output to separate files or using a log prefix.
Optional: Redirect logs to separate files:
# Start RQ workers in the background echo "🔧 Starting RQ workers..." -uv run --extra deepgram rq worker transcription memory default --url "${REDIS_URL:-redis://localhost:6379/0}" & +uv run --extra deepgram rq worker transcription memory default --url "${REDIS_URL:-redis://localhost:6379/0}" > /var/log/rq-worker.log 2>&1 & WORKER_PID=$! # Start the main FastAPI application echo "🌐 Starting FastAPI backend..." -uv run --extra deepgram python3 src/advanced_omi_backend/main.py & +uv run --extra deepgram python3 src/advanced_omi_backend/main.py > /var/log/backend.log 2>&1 & BACKEND_PID=$!backends/advanced/start-k8s.sh (1)
80-89: Process exit handling may leave orphaned processes.The script uses
wait -nto wait for any process to exit, then kills both worker and backend PIDs. However, if one process has already exited, thekillcommand at line 85 may fail silently (with2>/dev/null), and the script won't distinguish between a clean shutdown and an unexpected crash.Improve exit handling to report which process exited:
# Wait for any process to exit -wait -n +wait -n -p EXITED_PID # If we get here, one process has exited - kill the others -echo "⚠️ One service exited, stopping all services..." +if [ "$EXITED_PID" = "$WORKER_PID" ]; then + echo "⚠️ RQ workers exited unexpectedly, stopping backend..." +elif [ "$EXITED_PID" = "$BACKEND_PID" ]; then + echo "⚠️ FastAPI backend exited unexpectedly, stopping workers..." +fi kill $WORKER_PID $BACKEND_PID 2>/dev/null || true waittests/endpoints/chat_tests.robot (1)
49-81: Improve session structure validation in loop.The test at lines 68-77 loops through sessions and validates structure inside the loop, but the validation comment at line 69 is misplaced outside the loop body. The dictionary validation at lines 70-73 should logically be inside the IF block or dedented to clarify scope.
Consider restructuring for clarity:
FOR ${session} IN @{sessions} - # Verify chat session structure - Dictionary Should Contain Key ${session} session_id - Dictionary Should Contain Key ${session} title - Dictionary Should Contain Key ${session} created_at - Dictionary Should Contain Key ${session} updated_at + # Verify chat session structure + Dictionary Should Contain Key ${session} session_id + Dictionary Should Contain Key ${session} title + Dictionary Should Contain Key ${session} created_at + Dictionary Should Contain Key ${session} updated_at IF '${session}[session_id]' == '${test_session}[session_id]' ${found}= Set Variable ${True} END ENDtests/endpoints/health_tests.robot (2)
42-48: Guard logs to avoid KeyErrors when optional services are absentDirect indexing will fail if a service is not configured. Wrap logs with IF checks.
- Log To Console Mongodb: ${services}[mongodb][status] - Log To Console AudioAI: ${services}[audioai][status] - Log To Console Memory Service: ${services}[memory_service][status] - Log To Console Speech to Text: ${services}[speech_to_text][status] - Log To Console Speaker recognition: ${services}[speaker_recognition][status] + IF 'mongodb' in ${services} + Log To Console Mongodb: ${services}[mongodb][status] + END + IF 'audioai' in ${services} + Log To Console AudioAI: ${services}[audioai][status] + END + IF 'memory_service' in ${services} + Log To Console Memory Service: ${services}[memory_service][status] + END + IF 'speech_to_text' in ${services} + Log To Console Speech to Text: ${services}[speech_to_text][status] + END + IF 'speaker_recognition' in ${services} + Log To Console Speaker recognition: ${services}[speaker_recognition][status] + END
120-128: Nit: unused session
Get Anonymous Session anon_sessionis unused in this test. Consider removing to reduce noise.tests/endpoints/rq_queue_tests.robot (3)
135-137: Strengthen persistence assertions (current checks are tautologies)
>= 0can never fail and doesn’t prove persistence. Compare against the pre-restart count.- Should Be True ${jobs_count_after} >= 0 + Should Be True ${jobs_count_after} >= ${jobs_count_before}If jobs may process during restart, wrap with a brief retry:
Wait Until Keyword Succeeds 20s 2s Validate Jobs Persist ${jobs_count_before}Add a helper:
*** Keywords *** Validate Jobs Persist [Arguments] ${min_count} ${after}= Check Queue Jobs ${count}= Get Length ${after}[jobs] Should Be True ${count} >= ${min_count}Also applies to: 191-194
15-18: Avoid double initialization:Test SetuprepeatsSuite SetupRunning the same setup both per-suite and per-test adds time and flakiness. Keep it only in one place.
-Suite Setup Suite Setup -Suite Teardown Delete All Sessions -Test Setup Suite Setup +Suite Setup Suite Setup +Suite Teardown Delete All Sessions
26-33:FLUSHALLwipes the entire Redis instanceSafe for an isolated test Redis, but risky if shared. Prefer a dedicated DB index or key prefix deletion.
tests/endpoints/memory_tests.robot (2)
184-186: Endpoint path likely incorrect; align with admin path used elsewhereElsewhere you use
/api/admin/memories. This test hits/api/memories/adminand expects 403, which may return 404/405 instead.- ${response}= GET On Session user_session /api/memories/admin expected_status=403 + ${response}= GET On Session user_session /api/admin/memories expected_status=403
20-25: Consider using a non-admin user for “user memories” testUsing
admin_sessionhere doesn’t validate user scoping. Add a variant with a regular user to ensure only that user’s memories are returned.backends/advanced/webui/src/components/ConversationVersionDropdown.tsx (2)
114-118: Optional: show accurate labels before history loadsUse
versionInfo.active_*_versionwhenversionHistoryis null so labels reflect the real active versions immediately.
124-145: Accessibility: add ARIA and keyboard supportAdd
aria-haspopup="listbox",aria-expanded, and allow keyboard navigation for options. Improves usability without changing behavior.Also applies to: 186-207
tests/endpoints/system_admin_tests.robot (1)
206-207: Keyword signature inconsistency (Verify)
Delete Test Useris called with asessionarg here, but without one in other suites. Ensure the resource keyword supports both forms or standardize call sites.tests/test_env.py (1)
7-8: Force test env vars to override host values.Without
override=True, any pre-setBACKEND_URL,ADMIN_EMAIL, etc. on the runner will bleed into tests, potentially pointing automation at real services. Make the load deterministic by overriding host values:-load_dotenv(test_env_path) +load_dotenv(test_env_path, override=True)Makefile (2)
22-22: Add conventional phony aliases ‘all’ and ‘test’.Satisfies static analysis and matches expectations (
make testruns tests).-.PHONY: help menu setup-k8s setup-infrastructure setup-rbac setup-storage-pvc config config-docker config-k8s config-all clean deploy deploy-docker deploy-k8s deploy-k8s-full deploy-infrastructure deploy-apps check-infrastructure check-apps build-backend up-backend down-backend k8s-status k8s-cleanup k8s-purge audio-manage test-robot test-robot-integration test-robot-unit test-robot-endpoints test-robot-specific test-robot-clean +.PHONY: all test help menu setup-k8s setup-infrastructure setup-rbac setup-storage-pvc config config-docker config-k8s config-all clean deploy deploy-docker deploy-k8s deploy-k8s-full deploy-infrastructure deploy-apps check-infrastructure check-apps build-backend up-backend down-backend k8s-status k8s-cleanup k8s-purge audio-manage test-robot test-robot-integration test-robot-unit test-robot-endpoints test-robot-specific test-robot-clean + +all: menu +test: test-robot
188-203: Makeconfig-k8sresilient when cluster is absent.You already swallow apply errors; also consider guarding
kubectl get ... | sed | kubectl applycopy steps when the source resources don’t exist to avoid noisy failures in local dev.tests/resources/memory_keywords.robot (1)
31-38: Consider URL-encoding and types for query params.Ensure
score_thresholdis passed as a number and properly encoded by RequestsLibrary; current dict is fine, but if thresholds can beNone, add conditional inclusion like in Get User Memories.backends/advanced/src/advanced_omi_backend/transcription.py (3)
467-479: Absolute timestamps should anchor to session start, nottime.time().
absolute_timestamp = time.time() + startintroduces drift; use the audio session/conversation session_start (ms → seconds).- "absolute_timestamp": time.time() + segment.get("start", 0.0), + # Anchor to session_start (UTC) if available; fall back to now + "absolute_timestamp": ( + (conversation_model.session_start.timestamp() if 'conversation_model' in locals() and conversation_model and conversation_model.session_start else time.time()) + + float(segment.get("start", 0.0)) + ),If you prefer not to touch here, compute once above after fetching
conversation_model.
437-441: Narrow overly broadexcept Exceptionor at least log with context and re-raise selectively.Broad catches hide programmer errors and complicate retries/alerts.
- Prefer specific exceptions (I/O, network, Beanie, JSON).
- Where you must catch
Exception, includeexc_info=Trueand consider re-raising or returning error states for upstream handling.Also applies to: 570-577, 969-971
657-688: Session timestamp conversion: good, but guard zero/None.
datetime.fromtimestamp(raw/1000)will be epoch when missing; add a conditional to avoid misleading1970-01-01Zsession starts.- "session_start": datetime.fromtimestamp(audio_session.get("timestamp", 0) / 1000, tz=UTC), + "session_start": ( + datetime.fromtimestamp(audio_session["timestamp"] / 1000, tz=UTC) + if audio_session.get("timestamp") else datetime.now(UTC) + ),Also applies to: 691-699
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
extras/speaker-recognition/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (80)
.env.template(1 hunks)Makefile(4 hunks)backends/advanced/Dockerfile(1 hunks)backends/advanced/Dockerfile.k8s(1 hunks)backends/advanced/docker-compose-test.yml(3 hunks)backends/advanced/src/advanced_omi_backend/config.py(3 hunks)backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py(1 hunks)backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py(12 hunks)backends/advanced/src/advanced_omi_backend/controllers/system_controller.py(2 hunks)backends/advanced/src/advanced_omi_backend/controllers/system_controller.py.backup(1 hunks)backends/advanced/src/advanced_omi_backend/controllers/user_controller.py(3 hunks)backends/advanced/src/advanced_omi_backend/conversation_manager.py(1 hunks)backends/advanced/src/advanced_omi_backend/database.py(3 hunks)backends/advanced/src/advanced_omi_backend/main.py(5 hunks)backends/advanced/src/advanced_omi_backend/memory/prompts.py(1 hunks)backends/advanced/src/advanced_omi_backend/memory/utils.py(2 hunks)backends/advanced/src/advanced_omi_backend/models/__init__.py(1 hunks)backends/advanced/src/advanced_omi_backend/models/audio_session.py(1 hunks)backends/advanced/src/advanced_omi_backend/models/conversation.py(1 hunks)backends/advanced/src/advanced_omi_backend/models/job.py(1 hunks)backends/advanced/src/advanced_omi_backend/models/user.py(1 hunks)backends/advanced/src/advanced_omi_backend/processors.py(12 hunks)backends/advanced/src/advanced_omi_backend/processors.py.backup(1 hunks)backends/advanced/src/advanced_omi_backend/routers/api_router.py(2 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py(1 hunks)backends/advanced/src/advanced_omi_backend/routers/modules/user_routes.py(2 hunks)backends/advanced/src/advanced_omi_backend/rq_queue.py(1 hunks)backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py(2 hunks)backends/advanced/src/advanced_omi_backend/transcription.py(10 hunks)backends/advanced/src/advanced_omi_backend/transcription_providers.py(3 hunks)backends/advanced/src/advanced_omi_backend/users.py(1 hunks)backends/advanced/start-k8s.sh(1 hunks)backends/advanced/start.sh(1 hunks)backends/advanced/tests/test_conversation_models.py(1 hunks)backends/advanced/webui/Dockerfile(1 hunks)backends/advanced/webui/src/App.tsx(2 hunks)backends/advanced/webui/src/components/ConversationVersionDropdown.tsx(1 hunks)backends/advanced/webui/src/components/ConversationVersionHeader.tsx(1 hunks)backends/advanced/webui/src/components/layout/Layout.tsx(2 hunks)backends/advanced/webui/src/pages/Conversations.tsx(10 hunks)backends/advanced/webui/src/pages/Queue.tsx(1 hunks)backends/advanced/webui/src/pages/System.tsx(2 hunks)backends/advanced/webui/src/services/api.ts(2 hunks)backends/charts/advanced-backend/values.yaml(1 hunks)extras/speaker-recognition/docker-compose-test.yml(1 hunks)extras/speaker-recognition/pyproject.toml(1 hunks)requirements.txt(1 hunks)skaffold.yaml(2 hunks)tests/README.md(1 hunks)tests/TESTING_GUIDELINES.md(1 hunks)tests/all_api_tests.robot(1 hunks)tests/browser/browser_auth.robot(1 hunks)tests/endpoints/*client_queue_tests.robot(1 hunks)tests/endpoints/auth_tests.robot(1 hunks)tests/endpoints/chat_tests.robot(1 hunks)tests/endpoints/conversation_tests.robot(1 hunks)tests/endpoints/health_tests.robot(1 hunks)tests/endpoints/memory_tests.robot(1 hunks)tests/endpoints/rq_queue_tests.robot(1 hunks)tests/endpoints/system_admin_tests.robot(1 hunks)tests/integration/conversation_queue.robot(1 hunks)tests/integration/integration_test.robot(1 hunks)tests/integration/mobile_client_tests.robot(1 hunks)tests/resources/audio_keywords.robot(1 hunks)tests/resources/chat_keywords.robot(1 hunks)tests/resources/conversation_keywords.robot(1 hunks)tests/resources/integration_keywords.robot(1 hunks)tests/resources/memory_keywords.robot(1 hunks)tests/resources/queue_keywords.robot(1 hunks)tests/resources/session_resources.robot(1 hunks)tests/resources/setup_resources.robot(1 hunks)tests/resources/timing_keywords.robot(1 hunks)tests/resources/transcript_verification.robot(1 hunks)tests/resources/user_resources.robot(1 hunks)tests/test_data.py(1 hunks)tests/test_env.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-18T01:42:08.562Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T01:42:08.562Z
Learning: Applies to backends/advanced/src/**/*.py : Use conversation_id (not audio_uuid) for all reprocessing operations and version management
Applied to files:
backends/advanced/src/advanced_omi_backend/conversation_manager.pybackends/advanced/src/advanced_omi_backend/controllers/conversation_controller.pybackends/advanced/src/advanced_omi_backend/models/conversation.pybackends/advanced/src/advanced_omi_backend/transcription.py
🪛 checkmake (0.2.2)
Makefile
[warning] 22-22: Missing required phony target "all"
(minphony)
[warning] 22-22: Missing required phony target "test"
(minphony)
🪛 dotenv-linter (3.3.0)
.env.template
[warning] 108-108: [UnorderedKey] The MONGODB_K8S_URI key should go before the MONGODB_URI key
(UnorderedKey)
🪛 markdownlint-cli2 (0.18.1)
tests/TESTING_GUIDELINES.md
13-13: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
20-20: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
27-27: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
33-33: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
41-41: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
48-48: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
54-54: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
55-55: Spaces inside emphasis markers
(MD037, no-space-in-emphasis)
55-55: Spaces inside emphasis markers
(MD037, no-space-in-emphasis)
🪛 Ruff (0.13.1)
backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py
18-18: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
19-19: Do not perform function call File in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py
60-60: f-string without any placeholders
Remove extraneous f prefix
(F541)
152-152: Do not catch blind exception: Exception
(BLE001)
153-153: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
155-155: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/routers/modules/user_routes.py
33-33: Unused function argument: current_user
(ARG001)
33-33: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
backends/advanced/src/advanced_omi_backend/users.py
19-28: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py
36-36: Unused function argument: auto_generate_client
(ARG001)
82-82: Do not catch blind exception: Exception
(BLE001)
86-86: Use explicit conversion flag
Replace with conversion flag
(RUF010)
156-156: Do not catch blind exception: Exception
(BLE001)
157-157: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
176-176: Do not catch blind exception: Exception
(BLE001)
177-177: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
179-179: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py
46-46: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
backends/advanced/tests/test_conversation_models.py
133-133: Local variable version2 is assigned to but never used
Remove assignment to unused variable version2
(F841)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
25-25: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
43-43: Consider moving this statement to an else block
(TRY300)
45-45: Do not catch blind exception: Exception
(BLE001)
46-46: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
53-53: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
63-63: Abstract raise to an inner function
(TRY301)
92-92: Do not catch blind exception: Exception
(BLE001)
93-93: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
94-94: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
99-99: Unused function argument: current_user
(ARG001)
99-99: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
104-104: Consider moving this statement to an else block
(TRY300)
106-106: Do not catch blind exception: Exception
(BLE001)
107-107: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
116-116: Consider moving this statement to an else block
(TRY300)
118-118: Do not catch blind exception: Exception
(BLE001)
119-119: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
122-122: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
143-143: Do not catch blind exception: Exception
(BLE001)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
200-200: Consider moving this statement to an else block
(TRY300)
203-203: Redundant exception object included in logging.exception call
(TRY401)
677-677: Do not catch blind exception: Exception
(BLE001)
678-678: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
684-684: Unused function argument: audio_uuid
(ARG001)
687-687: Unused function argument: user_id
(ARG001)
710-710: Abstract raise to an inner function
(TRY301)
718-718: Abstract raise to an inner function
(TRY301)
718-718: Avoid specifying long messages outside the exception class
(TRY003)
800-800: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
818-818: Local variable audio_uuid is assigned to but never used
Remove assignment to unused variable audio_uuid
(F841)
backends/advanced/src/advanced_omi_backend/models/conversation.py
271-276: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backends/advanced/src/advanced_omi_backend/models/user.py
95-95: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
108-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/database.py
521-521: Redefinition of unused update_transcription_status from line 387
(F811)
522-522: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
522-522: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
backends/advanced/src/advanced_omi_backend/models/audio_session.py
56-61: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
backends/advanced/src/advanced_omi_backend/transcription.py
437-437: Do not catch blind exception: Exception
(BLE001)
899-899: Undefined name conversation
(F821)
tests/test_env.py
32-32: Possible hardcoded password assigned to: "TEST_USER_PASSWORD"
(S105)
backends/advanced/src/advanced_omi_backend/rq_queue.py
82-82: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
181-181: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
234-234: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
287-287: Avoid specifying long messages outside the exception class
(TRY003)
292-292: Avoid specifying long messages outside the exception class
(TRY003)
301-301: Avoid specifying long messages outside the exception class
(TRY003)
413-413: Avoid specifying long messages outside the exception class
(TRY003)
423-423: Avoid specifying long messages outside the exception class
(TRY003)
439-439: Local variable words is assigned to but never used
Remove assignment to unused variable words
(F841)
468-468: Avoid specifying long messages outside the exception class
(TRY003)
650-650: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
655-655: Unused function argument: client_id
(ARG001)
656-656: Unused function argument: user_id
(ARG001)
711-711: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1008-1008: Do not catch blind exception: Exception
(BLE001)
1009-1009: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1014-1014: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
1083-1083: Do not catch blind exception: Exception
(BLE001)
1104-1104: Do not catch blind exception: Exception
(BLE001)
1105-1105: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1135-1135: Do not catch blind exception: Exception
(BLE001)
1136-1136: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1142-1142: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/main.py
56-56: Redefinition of unused init_task_manager from line 52
Remove definition: init_task_manager
(F811)
336-336: Do not catch blind exception: Exception
(BLE001)
337-337: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1075-1075: Do not catch blind exception: Exception
(BLE001)
1077-1077: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backends/advanced/src/advanced_omi_backend/processors.py
144-144: Do not catch blind exception: Exception
(BLE001)
145-145: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
366-366: Unused method argument: metadata
(ARG002)
420-420: Do not catch blind exception: Exception
(BLE001)
425-425: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (46)
backends/advanced/webui/src/pages/System.tsx (1)
161-162: LGTM! Redis service display mapping added.The addition of the Redis service with the label 'REDIS & RQ WORKERS' clearly communicates both the Redis service and its associated RQ workers. This aligns well with the PR's move to RQ-based background processing.
tests/resources/timing_keywords.robot (7)
1-7: LGTM! Settings and variables properly configured.The library imports and global timing storage are appropriate for performance measurement utilities.
10-18: LGTM! Timer start logic is correct.The keyword properly captures epoch time, stores it with a namespaced key, and returns the start time.
49-69: LGTM! Timing summary logic is well-structured.The keyword correctly filters duration entries, accumulates totals, and provides formatted output for easy analysis.
71-75: LGTM! Reset logic is correct.The keyword properly clears the global timing storage.
77-85: LGTM! Convenient wrapper for timing operations.The keyword elegantly wraps timer logic around any keyword execution and returns both the result and duration.
116-155: Verify the sorting logic works correctly.Line 140 uses a complex lambda expression within
Evaluatethat mixes Robot Framework variables with Python syntax. The expressionsorted($operations, key=lambda x: $operation_times[x], reverse=True)may not work as expected because the lambda is trying to access a Robot Framework dictionary from within the Python evaluation context.Test this keyword with multiple operations to ensure the sorting works correctly. If it fails, consider this alternative approach:
# Convert to Python dict and sort in pure Python ${sorted_ops}= Evaluate sorted(list(${operations}), key=lambda x: ${operation_times}[x], reverse=True)Or use a more explicit approach:
# Create list of tuples (operation, duration), sort, extract operations ${op_dur_pairs}= Create List FOR ${op} IN @{operations} ${dur}= Get From Dictionary ${operation_times} ${op} ${pair}= Create List ${op} ${dur} Append To List ${op_dur_pairs} ${pair} END ${sorted_pairs}= Evaluate sorted($op_dur_pairs, key=lambda x: x[1], reverse=True) ${sorted_ops}= Evaluate [x[0] for x in $sorted_pairs]
157-190: LGTM! Benchmark logic is comprehensive.The keyword properly collects multiple measurements, computes meaningful statistics (average, min, max), and returns structured results for analysis.
skaffold.yaml (1)
152-155: Keep this URI aligned with the actual Service name.Once you restore the default service name (see prior comment), this value will work as-is. If you really need to rename the Service, update this host accordingly everywhere it’s used.
backends/advanced/src/advanced_omi_backend/memory/prompts.py (1)
232-232: LGTM! Formatting fix improves readability.The removal of unnecessary backslash escapes corrects the prompt template formatting.
backends/advanced/docker-compose-test.yml (3)
13-13: LGTM! Source mount accelerates test iteration.Mounting the source directory allows live code updates without rebuilding the test container, improving the test development workflow.
42-42: LGTM! CORS configuration supports cross-origin test scenarios.The CORS_ORIGINS environment variable properly lists the test frontend and backend origins, enabling cross-origin requests during integration testing.
53-53: LGTM! Extended start period accommodates initialization.Increasing the healthcheck start period to 30s provides sufficient time for the test backend to initialize dependencies (MongoDB, Redis, Qdrant, RQ) before health checks begin, reducing false failures.
extras/speaker-recognition/docker-compose-test.yml (2)
22-23: LGTM! Environment file loading standardizes test configuration.Loading the .env file enables centralized configuration management for the speaker recognition test service, consistent with the PR's test infrastructure improvements.
31-31: LGTM! CORS configuration aligns with test environment.The CORS_ORIGINS setting matches the backend test configuration, ensuring the speaker recognition service properly handles cross-origin requests during integration testing.
.env.template (1)
108-108: LGTM! Database name aligns with project naming.The MongoDB Kubernetes URI now correctly references the
friend-litedatabase, consistent with the project's infrastructure namespace and application naming conventions.backends/advanced/src/advanced_omi_backend/models/__init__.py (1)
1-10: LGTM! Clear documentation for the models package.The docstring and import guidance comments provide helpful context for developers working with the models package. The structural stub approach is appropriate for a package initializer.
backends/charts/advanced-backend/values.yaml (1)
62-62: LGTM! Improved configuration documentation.The comment clearly documents that CORS_ORIGINS is managed by the ConfigMap (generated from the Makefile) and should not be overridden in the Helm values. This reduces configuration drift and centralizes CORS management.
backends/advanced/src/advanced_omi_backend/processors.py.backup (1)
633-740: Avoid doubletask_done()when consuming the sentinelWhen the shutdown sentinel (
None) is dequeued, the branch callsself.audio_queue.task_done()and then hits thefinallyblock that callstask_done()again. Python’sQueueraisesValueError: task_done() called too many times, so the processor crashes during shutdown. Let thefinallyhandle the bookkeeping.if item is None: # Shutdown signal audio_logger.info("🛑 Audio processor received shutdown signal") - self.audio_queue.task_done() breakLikely an incorrect or invalid review comment.
backends/advanced/webui/src/App.tsx (1)
72-76: Queue route registered correctlyImport and protected route wiring match the existing pages, so the new queue UI should mount with the same guardrails.
backends/advanced/src/advanced_omi_backend/routers/api_router.py (1)
12-38: Router wiring stays consistentIncluding the new audio and queue routers here keeps all API modules exposed under
/apiwithout disrupting existing ordering.backends/advanced/webui/Dockerfile (1)
20-22: LGTM! Build-time backend URL configuration added correctly.The addition of
VITE_BACKEND_URLas a build argument and environment variable follows the same pattern asVITE_ALLOWED_HOSTSand aligns with Vite's convention for build-time environment variables.backends/advanced/webui/src/services/api.ts (1)
6-6: LGTM! Stricter backend URL validation.The additional check for an empty string prevents the function from using an explicitly set but empty
VITE_BACKEND_URL, ensuring proper fallback to auto-detection logic.backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py (1)
10-33: LGTM! New routers properly exported.The addition of
audio_routerandqueue_routerfollows the existing pattern for module exports. The docstring is updated to document the new functional areas, and the__all__list maintains alphabetical ordering.backends/advanced/webui/src/components/layout/Layout.tsx (2)
2-2: LGTM! Layers icon imported for queue navigation.The import of the
Layersicon from lucide-react is appropriate for representing queue/job management in the UI.
19-19: LGTM! Queue management navigation added for admins.The new Queue Management navigation item is correctly placed within the
isAdminconditional, ensuring only administrators can access queue management features. The use of theLayersicon is semantically appropriate for queue visualization.backends/advanced/Dockerfile.k8s (1)
39-45: Approve: start-k8s.sh verified to exist and handle signals/failures gracefullybackends/advanced/src/advanced_omi_backend/memory/utils.py (1)
110-119: Ignore the suggested escaping change forarray_pattern.
The existing f-string correctly escapes\sand\[(e.g.f'"{key}"\\s*:\\s*(\\[.*?\\])'yields the intended regex) and successfully extracts the arrays without modification.Likely an incorrect or invalid review comment.
backends/advanced/src/advanced_omi_backend/routers/modules/user_routes.py (1)
13-13: LGTM! Correct use of UserUpdate for update operations.The change from
UserCreatetoUserUpdatecorrectly distinguishes between creation and update schemas, following REST API best practices.Note: The static analysis warnings about unused
current_userandDependsin defaults are false positives for FastAPI's dependency injection pattern—current_userenforces authentication, andDependsmust be in the signature for FastAPI to process it.Also applies to: 33-33
tests/endpoints/auth_tests.robot (1)
1-147: Excellent test coverage and structure.The test suite provides comprehensive coverage of authentication and user management flows with proper separation of positive and negative test cases, good use of tags for organization, and appropriate cleanup of test data.
backends/advanced/src/advanced_omi_backend/conversation_manager.py (1)
72-96: Approve enqueue_final_transcription usage – Signature in rq_queue.py (conversation_id, audio_uuid, audio_path, user_id, client_id, user_email, priority) matches the call site; runtime imports and conditional enqueue logic are correct.tests/test_data.py (2)
7-18: LGTM!Sample conversation data is well-structured with appropriate fields for testing conversation retrieval and display.
20-29: LGTM!Sample memory data provides reasonable test cases with importance scores.
backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py (2)
10-10: LGTM!Import reduction reflects the removal of audio processing endpoints from this module, correctly retaining only
APIRouterandDepends.
14-14: User import paths are valid. TheUser,UserCreate, andUserUpdateclasses are defined inmodels/user.py, and the legacyusers.pyre-exports them for backward compatibility. No changes required.backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py (1)
44-49: LGTM!The rename from
get_conversationtoget_conversation_detailimproves clarity, and the controller method change aligns with the refactored conversation controller that now uses Beanie-backed models.tests/endpoints/chat_tests.robot (6)
1-12: LGTM!Test suite setup is well-structured with appropriate libraries and resources imported. The suite-level setup and teardown ensure consistent test context.
15-31: LGTM!Basic chat session creation test correctly verifies response structure and includes cleanup. The test validates all essential fields of a chat session.
199-210: LGTM!Unauthorized access test correctly verifies that authentication is required for chat endpoints, testing both session listing and statistics endpoints.
212-234: LGTM!Non-existent session test comprehensively covers all CRUD operations with appropriate 404 status code expectations.
236-254: LGTM!Invalid data test covers important validation scenarios (title length, empty title) with proper 422 status code expectations and cleanup.
256-283: LGTM!User isolation test is comprehensive, verifying that users cannot access other users' sessions and that session lists are properly scoped. This is critical for security.
tests/endpoints/health_tests.robot (1)
49-49: Fix Python-eval: remove quotes and use correct variable syntaxThe current expression treats the left-hand side as a literal string, so it never matches. Use the dict value directly.
- Should Be True '${health}[status]' in ['healthy', 'degraded', 'critical'] + Should Be True ${health}[status] in ['healthy', 'degraded', 'critical']Likely an incorrect or invalid review comment.
backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py (1)
16-35: Endpoint wiring looks solidAuth gating, parameter declarations, and delegation to the controller are all consistent with the new RQ-based flow. Nicely done.
tests/endpoints/*client_queue_tests.robot (1)
31-114: Good coverage on queue endpointsThe pagination, stats, and health checks here map neatly onto the new queue APIs and should keep regressions in check.
backends/advanced/src/advanced_omi_backend/transcription.py (1)
503-517: Beanie versioning path looks good.Creating initial transcript version and activating it via the model, then updating title/summary/speaker_names, aligns with the new Beanie model flow.
Based on learnings
Also applies to: 522-528
| if sample_rate != 16000: | ||
| audio_logger.warning( | ||
| f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz." | ||
| ) | ||
| raise JSONResponse(status_code=400, content={"error": f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry"}) | ||
|
|
There was a problem hiding this comment.
Don't raise JSONResponse objects
This path now raises JSONResponse, which will crash with TypeError: exceptions must derive from BaseException, convert the intended 400 into a 500, and leave the upload flow in a bad state. Please raise an HTTPException (or return the response) so the client gets the correct error and cleanup still runs.
-from fastapi import BackgroundTasks, File, Query, UploadFile
+from fastapi import BackgroundTasks, File, Query, UploadFile, HTTPException
@@
- raise JSONResponse(status_code=400, content={"error": f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry"})
+ raise HTTPException(
+ status_code=400,
+ detail=f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry",
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if sample_rate != 16000: | |
| audio_logger.warning( | |
| f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz." | |
| ) | |
| raise JSONResponse(status_code=400, content={"error": f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry"}) | |
| ++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py.backup | |
| @@ -1,4 +1,4 @@ | |
| from fastapi import BackgroundTasks, File, Query, UploadFile, HTTPException | |
| @@ -239,7 +239,10 @@ | |
| if sample_rate != 16000: | |
| audio_logger.warning( | |
| f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz." | |
| ) | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File {file.filename} has sample rate {sample_rate}Hz, expected 16kHz. I'll implement this at some point sorry", | |
| ) |
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py.backup
around lines 239-244, the code currently raises a JSONResponse (which is not an
Exception) and will produce a TypeError and a 500; change this to raise an
HTTPException with status_code=400 and the same error message (or alternatively
return the JSONResponse directly) so the client receives a 400 and cleanup still
runs; also ensure HTTPException is imported (e.g., from fastapi import
HTTPException) and keep the existing audio_logger.warning call.
| async def get_diarization_settings(): | ||
| """Get current diarization settings.""" | ||
| try: | ||
| # Reload from file to get latest settings | ||
| settings = load_diarization_settings_from_file() | ||
| return { | ||
| "settings": settings, | ||
| "status": "success" | ||
| } | ||
| except Exception as e: | ||
| logger.error(f"Error getting diarization settings: {e}") | ||
| return JSONResponse( | ||
| status_code=500, content={"error": f"Failed to get settings: {str(e)}"} | ||
| ) | ||
|
|
There was a problem hiding this comment.
Second get_diarization_settings overrides the first
The zero-arg definition here replaces the earlier get_diarization_settings(user, files, device_name, auto_generate_client) entirely. Any caller that still passes those arguments will now get TypeError: get_diarization_settings() takes 0 positional arguments but 4 were given, breaking the upload pipeline. Give these helpers distinct names (or consolidate them) so both behaviors remain callable.
-async def get_diarization_settings():
+async def get_diarization_settings_config():and update the call sites that expect the read-only helper to use the new name.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def get_diarization_settings(): | |
| """Get current diarization settings.""" | |
| try: | |
| # Reload from file to get latest settings | |
| settings = load_diarization_settings_from_file() | |
| return { | |
| "settings": settings, | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting diarization settings: {e}") | |
| return JSONResponse( | |
| status_code=500, content={"error": f"Failed to get settings: {str(e)}"} | |
| ) | |
| async def get_diarization_settings_config(): | |
| """Get current diarization settings.""" | |
| try: | |
| # Reload from file to get latest settings | |
| settings = load_diarization_settings_from_file() | |
| return { | |
| "settings": settings, | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting diarization settings: {e}") | |
| return JSONResponse( | |
| status_code=500, content={"error": f"Failed to get settings: {str(e)}"} | |
| ) |
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py.backup
around lines 763 to 777, a zero-argument async def get_diarization_settings()
was added and overrides an earlier function signature
get_diarization_settings(user, files, device_name, auto_generate_client),
causing TypeError in callers that pass arguments; rename the new read-only
helper to a distinct name (e.g., get_diarization_settings_readonly or
load_diarization_settings) or merge logic into the original function, and then
update every call site that expects the read-only behavior to use the new name
so the original multi-arg upload pipeline function remains callable.
| from advanced_omi_backend.task_manager import init_task_manager, get_task_manager | ||
|
|
||
| from advanced_omi_backend.rq_queue import redis_conn | ||
|
|
||
| from advanced_omi_backend.task_manager import init_task_manager | ||
| from advanced_omi_backend.transcript_coordinator import get_transcript_coordinator |
There was a problem hiding this comment.
Remove the duplicate init_task_manager import.
init_task_manager is imported twice (Lines 52 and 56), which Ruff flags with F811. Drop the second import to satisfy lint.
🧰 Tools
🪛 Ruff (0.13.1)
56-56: Redefinition of unused init_task_manager from line 52
Remove definition: init_task_manager
(F811)
🤖 Prompt for AI Agents
backends/advanced/src/advanced_omi_backend/main.py around lines 52 to 57: there
is a duplicate import of init_task_manager (imported on line 52 and again on
line 56) causing a lint F811; remove the second import statement (the redundant
"from advanced_omi_backend.task_manager import init_task_manager") so the module
imports are unique and, if needed, re-run the linter to confirm the error is
resolved.
| try: | ||
| from advanced_omi_backend.rq_queue import redis_conn | ||
| from rq import Worker | ||
|
|
||
| # Test Redis connection | ||
| await asyncio.wait_for(asyncio.to_thread(redis_conn.ping), timeout=5.0) | ||
|
|
||
| # Count active workers | ||
| workers = Worker.all(connection=redis_conn) | ||
| worker_count = len(workers) | ||
| active_workers = len([w for w in workers if w.state == 'busy']) | ||
| idle_workers = worker_count - active_workers | ||
|
|
||
| health_status["services"]["redis"] = { | ||
| "status": "✅ Connected", | ||
| "healthy": True, | ||
| "critical": True, | ||
| "worker_count": worker_count, | ||
| "active_workers": active_workers, | ||
| "idle_workers": idle_workers | ||
| } | ||
| except asyncio.TimeoutError: |
There was a problem hiding this comment.
Avoid blocking the event loop when enumerating RQ workers.
Worker.all() and each worker.state call hit Redis synchronously; running them inline inside an async endpoint will stall the loop under slow Redis. Push the whole tally into a thread:
- # Count active workers
- workers = Worker.all(connection=redis_conn)
- worker_count = len(workers)
- active_workers = len([w for w in workers if w.state == 'busy'])
- idle_workers = worker_count - active_workers
+ def summarize_workers():
+ workers = Worker.all(connection=redis_conn)
+ worker_count = len(workers)
+ active_workers = sum(1 for w in workers if w.state == "busy")
+ return worker_count, active_workers
+
+ worker_count, active_workers = await asyncio.wait_for(
+ asyncio.to_thread(summarize_workers),
+ timeout=5.0,
+ )
+ idle_workers = worker_count - active_workersThis keeps the health check responsive even if Redis is slow.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| from advanced_omi_backend.rq_queue import redis_conn | |
| from rq import Worker | |
| # Test Redis connection | |
| await asyncio.wait_for(asyncio.to_thread(redis_conn.ping), timeout=5.0) | |
| # Count active workers | |
| workers = Worker.all(connection=redis_conn) | |
| worker_count = len(workers) | |
| active_workers = len([w for w in workers if w.state == 'busy']) | |
| idle_workers = worker_count - active_workers | |
| health_status["services"]["redis"] = { | |
| "status": "✅ Connected", | |
| "healthy": True, | |
| "critical": True, | |
| "worker_count": worker_count, | |
| "active_workers": active_workers, | |
| "idle_workers": idle_workers | |
| } | |
| except asyncio.TimeoutError: | |
| try: | |
| from advanced_omi_backend.rq_queue import redis_conn | |
| from rq import Worker | |
| # Test Redis connection | |
| await asyncio.wait_for(asyncio.to_thread(redis_conn.ping), timeout=5.0) | |
| # Count active workers without blocking the event loop | |
| def summarize_workers(): | |
| workers = Worker.all(connection=redis_conn) | |
| worker_count = len(workers) | |
| active_workers = sum(1 for w in workers if w.state == "busy") | |
| return worker_count, active_workers | |
| worker_count, active_workers = await asyncio.wait_for( | |
| asyncio.to_thread(summarize_workers), | |
| timeout=5.0, | |
| ) | |
| idle_workers = worker_count - active_workers | |
| health_status["services"]["redis"] = { | |
| "status": "✅ Connected", | |
| "healthy": True, | |
| "critical": True, | |
| "worker_count": worker_count, | |
| "active_workers": active_workers, | |
| "idle_workers": idle_workers | |
| } | |
| except asyncio.TimeoutError: |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/main.py around lines 1045 to 1066,
the code calls Worker.all() and inspects worker.state synchronously which can
block the event loop; move the entire Redis worker enumeration and state-tally
into a thread by wrapping the logic that calls Worker.all() and counts busy/idle
workers in asyncio.to_thread (or loop.run_in_executor), return
worker_count/active_workers/idle_workers from that threaded function, await it
in the async handler, and update health_status with the returned values while
keeping the Redis ping check and exception handling intact so the endpoint
remains non-blocking even when Redis is slow.
| item = await asyncio.wait_for(self.transcription_queue.get(), timeout=30.0) | ||
|
|
||
| if item is None: # Shutdown signal | ||
| self.transcription_queue.task_done() | ||
| break | ||
|
|
||
| try: | ||
| # Get or create transcription manager for client | ||
| if item.client_id not in self.transcription_managers: | ||
| # Import here to avoid circular imports | ||
|
|
||
| audio_logger.info( | ||
| f"🔌 Creating new transcription manager for client {item.client_id}" | ||
| ) | ||
| manager = TranscriptionManager( | ||
| chunk_repo=self.repository, processor_manager=self | ||
| ) | ||
| try: | ||
| await manager.connect(item.client_id) | ||
| self.transcription_managers[item.client_id] = manager | ||
| audio_logger.info( | ||
| f"✅ Successfully created transcription manager for {item.client_id}" | ||
| ) | ||
| except Exception as e: | ||
| audio_logger.error( | ||
| f"❌ Failed to create transcription manager for {item.client_id}: {e}" | ||
| ) | ||
| # Mark transcription as failed when manager creation fails | ||
| self.track_processing_stage( | ||
| item.client_id, "transcription", "failed", {"error": str(e)} | ||
| ) | ||
| self.transcription_queue.task_done() | ||
| continue | ||
| else: | ||
| audio_logger.debug( | ||
| f"♻️ Reusing existing transcription manager for client {item.client_id}" | ||
| ) | ||
|
|
||
| manager = self.transcription_managers[item.client_id] | ||
|
|
||
| # Process transcription chunk | ||
| audio_logger.debug( | ||
| f"🎵 Processing transcribe_chunk for client {item.client_id}, audio_uuid: {item.audio_uuid}" | ||
| ) | ||
|
|
||
| try: | ||
| # Add timeout for transcription processing (5 minutes) | ||
| async with asyncio.timeout(300): # 5 minute timeout | ||
| await manager.transcribe_chunk( | ||
| item.audio_uuid, item.audio_chunk, item.client_id | ||
| ) | ||
| audio_logger.debug( | ||
| f"✅ Completed transcribe_chunk for client {item.client_id}" | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| audio_logger.error( | ||
| f"❌ Transcription timeout for client {item.client_id} after 5 minutes" | ||
| ) | ||
| # Mark transcription as failed on timeout | ||
| self.track_processing_stage( | ||
| item.client_id, | ||
| "transcription", | ||
| "failed", | ||
| {"error": "Transcription timeout (5 minutes)"}, | ||
| ) | ||
| except Exception as e: | ||
| audio_logger.error( | ||
| f"❌ Error in transcribe_chunk for client {item.client_id}: {e}", | ||
| exc_info=True, | ||
| ) | ||
| # Mark transcription as failed when chunk processing fails | ||
| self.track_processing_stage( | ||
| item.client_id, "transcription", "failed", {"error": str(e)} | ||
| ) | ||
|
|
||
| # Track transcription as started using direct state tracking - ONLY ONCE per audio session | ||
| # Check if we haven't already marked this transcription as started for this audio UUID | ||
| current_transcription_status = self.processing_state.get( | ||
| item.client_id, {} | ||
| ).get("transcription", {}) | ||
| current_audio_uuid = current_transcription_status.get("metadata", {}).get( | ||
| "audio_uuid" | ||
| ) | ||
|
|
||
| # Only mark as started if this is a new audio UUID or no transcription status exists | ||
| if current_audio_uuid != item.audio_uuid: | ||
| audio_logger.info( | ||
| f"🎯 Starting transcription tracking for new audio UUID: {item.audio_uuid}" | ||
| ) | ||
| self.track_processing_stage( | ||
| item.client_id, | ||
| "transcription", | ||
| "started", | ||
| {"audio_uuid": item.audio_uuid, "chunk_processing": True}, | ||
| ) | ||
| else: | ||
| audio_logger.debug( | ||
| f"⏩ Skipping transcription status update - already tracking audio UUID: {item.audio_uuid}" | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| audio_logger.error( | ||
| f"Error processing transcription for client {item.client_id}: {e}", | ||
| exc_info=True, | ||
| ) | ||
| finally: | ||
| self.transcription_queue.task_done() | ||
|
|
There was a problem hiding this comment.
Fix transcription queue shutdown bookkeeping
The transcription loop has the same double-task_done() pattern on its sentinel. On shutdown it throws ValueError and leaves the task in an error state. Remove the extra call inside the branch.
if item is None: # Shutdown signal
- self.transcription_queue.task_done()
break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| item = await asyncio.wait_for(self.transcription_queue.get(), timeout=30.0) | |
| if item is None: # Shutdown signal | |
| self.transcription_queue.task_done() | |
| break | |
| try: | |
| # Get or create transcription manager for client | |
| if item.client_id not in self.transcription_managers: | |
| # Import here to avoid circular imports | |
| audio_logger.info( | |
| f"🔌 Creating new transcription manager for client {item.client_id}" | |
| ) | |
| manager = TranscriptionManager( | |
| chunk_repo=self.repository, processor_manager=self | |
| ) | |
| try: | |
| await manager.connect(item.client_id) | |
| self.transcription_managers[item.client_id] = manager | |
| audio_logger.info( | |
| f"✅ Successfully created transcription manager for {item.client_id}" | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"❌ Failed to create transcription manager for {item.client_id}: {e}" | |
| ) | |
| # Mark transcription as failed when manager creation fails | |
| self.track_processing_stage( | |
| item.client_id, "transcription", "failed", {"error": str(e)} | |
| ) | |
| self.transcription_queue.task_done() | |
| continue | |
| else: | |
| audio_logger.debug( | |
| f"♻️ Reusing existing transcription manager for client {item.client_id}" | |
| ) | |
| manager = self.transcription_managers[item.client_id] | |
| # Process transcription chunk | |
| audio_logger.debug( | |
| f"🎵 Processing transcribe_chunk for client {item.client_id}, audio_uuid: {item.audio_uuid}" | |
| ) | |
| try: | |
| # Add timeout for transcription processing (5 minutes) | |
| async with asyncio.timeout(300): # 5 minute timeout | |
| await manager.transcribe_chunk( | |
| item.audio_uuid, item.audio_chunk, item.client_id | |
| ) | |
| audio_logger.debug( | |
| f"✅ Completed transcribe_chunk for client {item.client_id}" | |
| ) | |
| except asyncio.TimeoutError: | |
| audio_logger.error( | |
| f"❌ Transcription timeout for client {item.client_id} after 5 minutes" | |
| ) | |
| # Mark transcription as failed on timeout | |
| self.track_processing_stage( | |
| item.client_id, | |
| "transcription", | |
| "failed", | |
| {"error": "Transcription timeout (5 minutes)"}, | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"❌ Error in transcribe_chunk for client {item.client_id}: {e}", | |
| exc_info=True, | |
| ) | |
| # Mark transcription as failed when chunk processing fails | |
| self.track_processing_stage( | |
| item.client_id, "transcription", "failed", {"error": str(e)} | |
| ) | |
| # Track transcription as started using direct state tracking - ONLY ONCE per audio session | |
| # Check if we haven't already marked this transcription as started for this audio UUID | |
| current_transcription_status = self.processing_state.get( | |
| item.client_id, {} | |
| ).get("transcription", {}) | |
| current_audio_uuid = current_transcription_status.get("metadata", {}).get( | |
| "audio_uuid" | |
| ) | |
| # Only mark as started if this is a new audio UUID or no transcription status exists | |
| if current_audio_uuid != item.audio_uuid: | |
| audio_logger.info( | |
| f"🎯 Starting transcription tracking for new audio UUID: {item.audio_uuid}" | |
| ) | |
| self.track_processing_stage( | |
| item.client_id, | |
| "transcription", | |
| "started", | |
| {"audio_uuid": item.audio_uuid, "chunk_processing": True}, | |
| ) | |
| else: | |
| audio_logger.debug( | |
| f"⏩ Skipping transcription status update - already tracking audio UUID: {item.audio_uuid}" | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"Error processing transcription for client {item.client_id}: {e}", | |
| exc_info=True, | |
| ) | |
| finally: | |
| self.transcription_queue.task_done() | |
| item = await asyncio.wait_for(self.transcription_queue.get(), timeout=30.0) | |
| if item is None: # Shutdown signal | |
| break | |
| try: | |
| # Get or create transcription manager for client | |
| if item.client_id not in self.transcription_managers: | |
| # Import here to avoid circular imports | |
| audio_logger.info( | |
| f"🔌 Creating new transcription manager for client {item.client_id}" | |
| ) | |
| manager = TranscriptionManager( | |
| chunk_repo=self.repository, processor_manager=self | |
| ) | |
| try: | |
| await manager.connect(item.client_id) | |
| self.transcription_managers[item.client_id] = manager | |
| audio_logger.info( | |
| f"✅ Successfully created transcription manager for {item.client_id}" | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"❌ Failed to create transcription manager for {item.client_id}: {e}" | |
| ) | |
| # Mark transcription as failed when manager creation fails | |
| self.track_processing_stage( | |
| item.client_id, "transcription", "failed", {"error": str(e)} | |
| ) | |
| self.transcription_queue.task_done() | |
| continue | |
| else: | |
| audio_logger.debug( | |
| f"♻️ Reusing existing transcription manager for client {item.client_id}" | |
| ) | |
| manager = self.transcription_managers[item.client_id] | |
| # Process transcription chunk | |
| audio_logger.debug( | |
| f"🎵 Processing transcribe_chunk for client {item.client_id}, audio_uuid: {item.audio_uuid}" | |
| ) | |
| try: | |
| # Add timeout for transcription processing (5 minutes) | |
| async with asyncio.timeout(300): # 5 minute timeout | |
| await manager.transcribe_chunk( | |
| item.audio_uuid, item.audio_chunk, item.client_id | |
| ) | |
| audio_logger.debug( | |
| f"✅ Completed transcribe_chunk for client {item.client_id}" | |
| ) | |
| except asyncio.TimeoutError: | |
| audio_logger.error( | |
| f"❌ Transcription timeout for client {item.client_id} after 5 minutes" | |
| ) | |
| # Mark transcription as failed on timeout | |
| self.track_processing_stage( | |
| item.client_id, | |
| "transcription", | |
| "failed", | |
| {"error": "Transcription timeout (5 minutes)"}, | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"❌ Error in transcribe_chunk for client {item.client_id}: {e}", | |
| exc_info=True, | |
| ) | |
| # Mark transcription as failed when chunk processing fails | |
| self.track_processing_stage( | |
| item.client_id, "transcription", "failed", {"error": str(e)} | |
| ) | |
| # Track transcription as started using direct state tracking - ONLY ONCE per audio session | |
| # Check if we haven't already marked this transcription as started for this audio UUID | |
| current_transcription_status = self.processing_state.get( | |
| item.client_id, {} | |
| ).get("transcription", {}) | |
| current_audio_uuid = current_transcription_status.get("metadata", {}).get( | |
| "audio_uuid" | |
| ) | |
| # Only mark as started if this is a new audio UUID or no transcription status exists | |
| if current_audio_uuid != item.audio_uuid: | |
| audio_logger.info( | |
| f"🎯 Starting transcription tracking for new audio UUID: {item.audio_uuid}" | |
| ) | |
| self.track_processing_stage( | |
| item.client_id, | |
| "transcription", | |
| "started", | |
| {"audio_uuid": item.audio_uuid, "chunk_processing": True}, | |
| ) | |
| else: | |
| audio_logger.debug( | |
| f"⏩ Skipping transcription status update - already tracking audio UUID: {item.audio_uuid}" | |
| ) | |
| except Exception as e: | |
| audio_logger.error( | |
| f"Error processing transcription for client {item.client_id}: {e}", | |
| exc_info=True, | |
| ) | |
| finally: | |
| self.transcription_queue.task_done() |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/processors.py.backup around lines
765 to 872, the sentinel shutdown branch calls
self.transcription_queue.task_done() and then breaks, but the surrounding
finally block also calls task_done(), causing a double task_done() and a
ValueError on shutdown; remove the task_done() call inside the "if item is None:
# Shutdown signal" branch (leave the break so the finally block performs the
single task_done()), then run the shutdown path to verify no ValueError occurs.
| Create Session temp_user_session ${API_URL} verify=True | ||
| &{headers}= Create Dictionary Authorization=Bearer ${token} | ||
| ${response}= GET On Session temp_user_session /users/me headers=${headers} expected_status=200 | ||
| ${user}= Set Variable ${response.json()} | ||
| Delete All Sessions | ||
| RETURN ${user} |
There was a problem hiding this comment.
Keep other sessions intact in token helper
Get Admin User Details With Token ends with Delete All Sessions, which wipes every RequestsLibrary session, including the admin/api sessions created earlier in the suite. Any tests running after this keyword lose their authentication context. Delete only the temporary session you just created.
${response}= GET On Session temp_user_session /users/me headers=${headers} expected_status=200
${user}= Set Variable ${response.json()}
- Delete All Sessions
+ Delete Session temp_user_session
RETURN ${user}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Create Session temp_user_session ${API_URL} verify=True | |
| &{headers}= Create Dictionary Authorization=Bearer ${token} | |
| ${response}= GET On Session temp_user_session /users/me headers=${headers} expected_status=200 | |
| ${user}= Set Variable ${response.json()} | |
| Delete All Sessions | |
| RETURN ${user} | |
| Create Session temp_user_session ${API_URL} verify=True | |
| &{headers}= Create Dictionary Authorization=Bearer ${token} | |
| ${response}= GET On Session temp_user_session /users/me headers=${headers} expected_status=200 | |
| ${user}= Set Variable ${response.json()} | |
| Delete Session temp_user_session | |
| RETURN ${user} |
🤖 Prompt for AI Agents
In tests/resources/user_resources.robot around lines 89 to 94, the keyword ends
with Delete All Sessions which clears every RequestsLibrary session and breaks
later tests; replace that call with a single-session deletion such as calling
Delete Session for the temporary session name (temp_user_session) so only the
temporary session is removed while leaving admin/api sessions intact.
| Create Session temp ${API_URL} verify=True | ||
| &{auth_data}= Create Dictionary username=${email} password=${password} | ||
| &{headers}= Create Dictionary Content-Type=application/x-www-form-urlencoded | ||
|
|
||
| ${response}= POST On Session temp /auth/jwt/login data=${auth_data} expected_status=any | ||
| Delete All Sessions | ||
| RETURN ${response} |
There was a problem hiding this comment.
Do not drop existing sessions during negative login test
The invalid-login helper also calls Delete All Sessions, clearing every active alias and breaking subsequent steps that depend on admin or API sessions. Only tear down the temporary session you created for this attempt.
${response}= POST On Session temp /auth/jwt/login data=${auth_data} expected_status=any
- Delete All Sessions
+ Delete Session temp
RETURN ${response}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Create Session temp ${API_URL} verify=True | |
| &{auth_data}= Create Dictionary username=${email} password=${password} | |
| &{headers}= Create Dictionary Content-Type=application/x-www-form-urlencoded | |
| ${response}= POST On Session temp /auth/jwt/login data=${auth_data} expected_status=any | |
| Delete All Sessions | |
| RETURN ${response} | |
| Create Session temp ${API_URL} verify=True | |
| &{auth_data}= Create Dictionary username=${email} password=${password} | |
| &{headers}= Create Dictionary Content-Type=application/x-www-form-urlencoded | |
| ${response}= POST On Session temp /auth/jwt/login data=${auth_data} expected_status=any | |
| Delete Session temp | |
| RETURN ${response} |
🤖 Prompt for AI Agents
In tests/resources/user_resources.robot around lines 129 to 135, the
negative-login helper currently calls "Delete All Sessions", which clears every
active alias and breaks later tests; change the teardown to remove only the
temporary session created for the test by replacing the global deletion with a
targeted call such as "Delete Session temp" (i.e., remove or replace "Delete
All Sessions" with a single-session delete for the alias created at the top of
this snippet).
| TRY | ||
| # Try to find and delete the user | ||
| ${users}= List All Users | ||
| FOR ${user} IN @{users} | ||
| IF "${user}[email]" == "${user_email}" | ||
| Delete Test User ${user}[user_id] | ||
| Log Deleted test user: ${user_email} INFO | ||
| RETURN |
There was a problem hiding this comment.
Pass an admin session when deleting users
Inside Cleanup Test User the call Delete Test User ${user}[user_id] omits the session argument, so Robot raises “expected 2 arguments, got 1” and the cleanup falls into the EXCEPT block every time. Fetch the admin session once and pass it to the delete helper.
TRY
# Try to find and delete the user
${users}= List All Users
+ ${admin_session}= Get Admin Session
FOR ${user} IN @{users}
IF "${user}[email]" == "${user_email}"
- Delete Test User ${user}[user_id]
+ Delete Test User ${admin_session} ${user}[user_id]
Log Deleted test user: ${user_email} INFO
RETURN
END📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| TRY | |
| # Try to find and delete the user | |
| ${users}= List All Users | |
| FOR ${user} IN @{users} | |
| IF "${user}[email]" == "${user_email}" | |
| Delete Test User ${user}[user_id] | |
| Log Deleted test user: ${user_email} INFO | |
| RETURN | |
| TRY | |
| # Try to find and delete the user | |
| ${users}= List All Users | |
| ${admin_session}= Get Admin Session | |
| FOR ${user} IN @{users} | |
| IF "${user}[email]" == "${user_email}" | |
| Delete Test User ${admin_session} ${user}[user_id] | |
| Log Deleted test user: ${user_email} INFO | |
| RETURN |
🤖 Prompt for AI Agents
In tests/resources/user_resources.robot around lines 141 to 148, the call Delete
Test User ${user}[user_id] is missing the required admin session argument
causing a “expected 2 arguments, got 1” error; modify the TRY block to obtain
the admin session once (e.g., ${admin_session}= Get Admin Session or the
appropriate keyword) before the FOR loop and pass that session into the delete
call (Delete Test User ${user}[user_id] ${admin_session}) so the helper
receives both arguments.
tests/test_data.py
Outdated
| TEST_AUDIO_FILE = "tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav" | ||
| TEST_DEVICE_NAME = "Robot-test-device" | ||
|
|
||
| # Expected content for transcript quality verification | ||
| EXPECTED_TRANSCRIPT = "glass blowing" |
There was a problem hiding this comment.
🧩 Analysis chain
Verify test audio file exists at the specified path.
The hardcoded path tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav must exist for tests to succeed. Ensure this file is committed to the repository or generated during test setup.
Run this script to verify the file exists:
🏁 Script executed:
#!/bin/bash
# Check if test audio file exists
if [ -f "tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav" ]; then
echo "✅ Test audio file exists"
ls -lh "tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav"
else
echo "❌ Test audio file not found at tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav"
exit 1
fiLength of output: 92
Include or generate the missing test audio file
The file tests/test_data/DIY_Experts_Glass_Blowing_16khz_mono_4min.wav is not present, so tests will fail. Commit this asset to tests/test_data/ or update the test setup to generate or reference an existing audio fixture.
| ### Test Files | ||
| - Use descriptive names that indicate the testing scope | ||
| - Example: `full_pipeline_test.robot`, `user_management_test.robot` | ||
| - Use `_test.robot` suffix for test files | ||
|
|
There was a problem hiding this comment.
Align the file-naming rule with the actual suite names.
Guidelines here say to use the _test.robot suffix, but every suite listed elsewhere in this PR—and in tests/README.md—uses _tests.robot. That contradiction will confuse contributors. Please decide on one convention and update both the guidance and existing references to match.
🤖 Prompt for AI Agents
In tests/TESTING_GUIDELINES.md around lines 123 to 127, the file-naming guidance
currently requires a `_test.robot` suffix but other docs and suites use
`_tests.robot`; update this file to use the `_tests.robot` convention to match
the rest of the repository, and also search/update any remaining references in
tests/README.md and suite filenames to ensure consistency (or if you prefer
`_test.robot`, change all mentions and filenames across the repo to that suffix)
— pick one convention, update the wording and examples in TESTING_GUIDELINES.md
accordingly, and then rename or update references to test files so all docs and
file names use the chosen suffix.
| if channels == 2: | ||
| if sample_width == 2: | ||
| audio_array = np.frombuffer(audio_data, dtype=np.int16) | ||
| else: | ||
| audio_array = np.frombuffer(audio_data, dtype=np.int32) | ||
|
|
||
| # Reshape to separate channels and average | ||
| audio_array = audio_array.reshape(-1, 2) | ||
| audio_data = np.mean(audio_array, axis=1).astype(audio_array.dtype).tobytes() | ||
| channels = 1 | ||
|
|
||
| # Check sample rate | ||
| if sample_rate != 16000: | ||
| processed_files.append({ | ||
| "filename": file.filename, | ||
| "status": "error", | ||
| "error": f"Sample rate must be 16kHz, got {sample_rate}Hz", | ||
| }) | ||
| continue | ||
|
|
||
| # Save audio file to disk | ||
| audio_uuid = str(uuid.uuid4()) | ||
| audio_filename = f"{audio_uuid}.wav" | ||
| audio_path = CHUNK_DIR / audio_filename | ||
|
|
||
| # Write processed audio to file | ||
| with wave.open(str(audio_path), "wb") as wav_out: | ||
| wav_out.setnchannels(channels) | ||
| wav_out.setsampwidth(sample_width) | ||
| wav_out.setframerate(sample_rate) | ||
| wav_out.writeframes(audio_data) | ||
|
|
||
| audio_logger.info( |
There was a problem hiding this comment.
Fix stereo down-mix for non‑16‑bit uploads.
When a stereo WAV isn’t 16‑bit PCM, we coerce the buffer to int32 and still write it back with the original (e.g., 1- or 3-byte) sample width. The result is a header/data mismatch that corrupts any 8‑bit or 24‑bit stereo upload. Please fail fast (or convert) unless sample_width == 2, and update the downmix to emit 16‑bit mono bytes so the file we persist matches its metadata.
- if channels == 2:
- if sample_width == 2:
- audio_array = np.frombuffer(audio_data, dtype=np.int16)
- else:
- audio_array = np.frombuffer(audio_data, dtype=np.int32)
-
- # Reshape to separate channels and average
- audio_array = audio_array.reshape(-1, 2)
- audio_data = np.mean(audio_array, axis=1).astype(audio_array.dtype).tobytes()
- channels = 1
+ if channels == 2:
+ if sample_width != 2:
+ processed_files.append({
+ "filename": file.filename,
+ "status": "error",
+ "error": "Stereo files must use 16-bit PCM (2-byte sample width)",
+ })
+ continue
+
+ stereo = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2)
+ mono = np.mean(stereo, axis=1).astype(np.int16)
+ audio_data = mono.tobytes()
+ sample_width = 2
+ channels = 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if channels == 2: | |
| if sample_width == 2: | |
| audio_array = np.frombuffer(audio_data, dtype=np.int16) | |
| else: | |
| audio_array = np.frombuffer(audio_data, dtype=np.int32) | |
| # Reshape to separate channels and average | |
| audio_array = audio_array.reshape(-1, 2) | |
| audio_data = np.mean(audio_array, axis=1).astype(audio_array.dtype).tobytes() | |
| channels = 1 | |
| # Check sample rate | |
| if sample_rate != 16000: | |
| processed_files.append({ | |
| "filename": file.filename, | |
| "status": "error", | |
| "error": f"Sample rate must be 16kHz, got {sample_rate}Hz", | |
| }) | |
| continue | |
| # Save audio file to disk | |
| audio_uuid = str(uuid.uuid4()) | |
| audio_filename = f"{audio_uuid}.wav" | |
| audio_path = CHUNK_DIR / audio_filename | |
| # Write processed audio to file | |
| with wave.open(str(audio_path), "wb") as wav_out: | |
| wav_out.setnchannels(channels) | |
| wav_out.setsampwidth(sample_width) | |
| wav_out.setframerate(sample_rate) | |
| wav_out.writeframes(audio_data) | |
| audio_logger.info( | |
| if channels == 2: | |
| if sample_width != 2: | |
| processed_files.append({ | |
| "filename": file.filename, | |
| "status": "error", | |
| "error": "Stereo files must use 16-bit PCM (2-byte sample width)", | |
| }) | |
| continue | |
| stereo = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2) | |
| mono = np.mean(stereo, axis=1).astype(np.int16) | |
| audio_data = mono.tobytes() | |
| sample_width = 2 | |
| channels = 1 | |
| # Check sample rate | |
| if sample_rate != 16000: | |
| processed_files.append({ | |
| "filename": file.filename, | |
| "status": "error", | |
| "error": f"Sample rate must be 16kHz, got {sample_rate}Hz", | |
| }) | |
| continue | |
| # Save audio file to disk | |
| audio_uuid = str(uuid.uuid4()) | |
| audio_filename = f"{audio_uuid}.wav" | |
| audio_path = CHUNK_DIR / audio_filename | |
| # Write processed audio to file | |
| with wave.open(str(audio_path), "wb") as wav_out: | |
| wav_out.setnchannels(channels) | |
| wav_out.setsampwidth(sample_width) | |
| wav_out.setframerate(sample_rate) | |
| wav_out.writeframes(audio_data) | |
| audio_logger.info( |
| async def update_transcription_status( | ||
| self, audio_uuid: str, status: str, error_message: str = None, provider: str = None | ||
| ): | ||
| """Update transcript processing status for conversation.""" | ||
| update_doc = { | ||
| "transcript_processing_status": status, | ||
| "transcript_processing_updated_at": datetime.now(UTC), | ||
| "updated_at": datetime.now(UTC) | ||
| } | ||
| if provider: | ||
| update_doc["transcript_provider"] = provider | ||
| if error_message: | ||
| update_doc["transcript_processing_error"] = error_message | ||
|
|
||
| result = await self.col.update_one( | ||
| {"conversation_id": conversation_id}, | ||
| {"$set": update_doc} | ||
| ) | ||
| return result.modified_count > 0 | ||
|
|
||
|
|
||
| class ProcessingRunsRepository: | ||
| """Repository for processing run tracking (updated for conversation_id).""" | ||
|
|
||
| def __init__(self, collection): | ||
| self.col = collection | ||
|
|
||
| async def create_run( | ||
| self, | ||
| *, | ||
| conversation_id: str, | ||
| audio_uuid: str, # Keep for audio file access | ||
| run_type: str, # 'transcript' or 'memory' | ||
| user_id: str, | ||
| trigger: str, # 'manual_reprocess', 'initial_processing', etc. | ||
| config_hash: str = None | ||
| ) -> str: | ||
| """Create a new processing run for conversation.""" | ||
| run_id = str(uuid.uuid4()) | ||
| doc = { | ||
| "run_id": run_id, | ||
| "conversation_id": conversation_id, | ||
| "audio_uuid": audio_uuid, # Keep for file access | ||
| "run_type": run_type, | ||
| "user_id": user_id, | ||
| "trigger": trigger, | ||
| "config_hash": config_hash, | ||
| "status": "PENDING", | ||
| "started_at": datetime.now(UTC), | ||
| "completed_at": None, | ||
| "error_message": None, | ||
| "result_version_id": None | ||
| } | ||
| await self.col.insert_one(doc) | ||
| logger.info(f"Created processing run {run_id} for conversation {conversation_id}") | ||
| return run_id | ||
|
|
||
| async def update_run_status( | ||
| self, | ||
| run_id: str, | ||
| status: str, | ||
| error_message: str = None, | ||
| result_version_id: str = None | ||
| ) -> bool: | ||
| """Update processing run status.""" | ||
| """Update transcription status and completion timestamp. | ||
|
|
||
| Args: | ||
| audio_uuid: UUID of the audio chunk | ||
| status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY') | ||
| error_message: Optional error message if status is 'FAILED' | ||
| provider: Optional provider name for successful transcriptions | ||
| """ | ||
| update_doc = { | ||
| "status": status, | ||
| "updated_at": datetime.now(UTC) | ||
| "transcription_status": status, | ||
| "updated_at": datetime.now(UTC).isoformat() | ||
| } | ||
| if status in ["COMPLETED", "FAILED"]: | ||
| update_doc["completed_at"] = datetime.now(UTC) | ||
| if error_message: | ||
| update_doc["error_message"] = error_message | ||
| if result_version_id: | ||
| update_doc["result_version_id"] = result_version_id | ||
|
|
||
|
|
||
| if status == "COMPLETED": | ||
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | ||
| if provider: | ||
| update_doc["transcription_provider"] = provider | ||
| elif status == "FAILED" and error_message: | ||
| update_doc["transcription_error"] = error_message | ||
| elif status == "EMPTY": | ||
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | ||
| if provider: | ||
| update_doc["transcription_provider"] = provider | ||
|
|
||
| result = await self.col.update_one( | ||
| {"run_id": run_id}, | ||
| {"$set": update_doc} | ||
| {"audio_uuid": audio_uuid}, {"$set": update_doc} | ||
| ) | ||
|
|
||
| if result.modified_count > 0: | ||
| logger.info(f"Updated processing run {run_id} status to {status}") | ||
| return result.modified_count > 0 |
There was a problem hiding this comment.
Remove the duplicate update_transcription_status.
This second definition overrides the richer version earlier in the class, so callers lose transcript-version updates and we silently drop provider / error metadata. It also triggers Ruff F811. Please delete this duplicate (or merge its logic into the versioned implementation) so the intended behavior survives.
- async def update_transcription_status(
- self, audio_uuid: str, status: str, error_message: str = None, provider: str = None
- ):
- """Update transcription status and completion timestamp.
-
- Args:
- audio_uuid: UUID of the audio chunk
- status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY')
- error_message: Optional error message if status is 'FAILED'
- provider: Optional provider name for successful transcriptions
- """
- update_doc = {
- "transcription_status": status,
- "updated_at": datetime.now(UTC).isoformat()
- }
-
- if status == "COMPLETED":
- update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
- if provider:
- update_doc["transcription_provider"] = provider
- elif status == "FAILED" and error_message:
- update_doc["transcription_error"] = error_message
- elif status == "EMPTY":
- update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat()
- if provider:
- update_doc["transcription_provider"] = provider
-
- result = await self.col.update_one(
- {"audio_uuid": audio_uuid}, {"$set": update_doc}
- )
- return result.modified_count > 0📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def update_transcription_status( | |
| self, audio_uuid: str, status: str, error_message: str = None, provider: str = None | |
| ): | |
| """Update transcript processing status for conversation.""" | |
| update_doc = { | |
| "transcript_processing_status": status, | |
| "transcript_processing_updated_at": datetime.now(UTC), | |
| "updated_at": datetime.now(UTC) | |
| } | |
| if provider: | |
| update_doc["transcript_provider"] = provider | |
| if error_message: | |
| update_doc["transcript_processing_error"] = error_message | |
| result = await self.col.update_one( | |
| {"conversation_id": conversation_id}, | |
| {"$set": update_doc} | |
| ) | |
| return result.modified_count > 0 | |
| class ProcessingRunsRepository: | |
| """Repository for processing run tracking (updated for conversation_id).""" | |
| def __init__(self, collection): | |
| self.col = collection | |
| async def create_run( | |
| self, | |
| *, | |
| conversation_id: str, | |
| audio_uuid: str, # Keep for audio file access | |
| run_type: str, # 'transcript' or 'memory' | |
| user_id: str, | |
| trigger: str, # 'manual_reprocess', 'initial_processing', etc. | |
| config_hash: str = None | |
| ) -> str: | |
| """Create a new processing run for conversation.""" | |
| run_id = str(uuid.uuid4()) | |
| doc = { | |
| "run_id": run_id, | |
| "conversation_id": conversation_id, | |
| "audio_uuid": audio_uuid, # Keep for file access | |
| "run_type": run_type, | |
| "user_id": user_id, | |
| "trigger": trigger, | |
| "config_hash": config_hash, | |
| "status": "PENDING", | |
| "started_at": datetime.now(UTC), | |
| "completed_at": None, | |
| "error_message": None, | |
| "result_version_id": None | |
| } | |
| await self.col.insert_one(doc) | |
| logger.info(f"Created processing run {run_id} for conversation {conversation_id}") | |
| return run_id | |
| async def update_run_status( | |
| self, | |
| run_id: str, | |
| status: str, | |
| error_message: str = None, | |
| result_version_id: str = None | |
| ) -> bool: | |
| """Update processing run status.""" | |
| """Update transcription status and completion timestamp. | |
| Args: | |
| audio_uuid: UUID of the audio chunk | |
| status: New status ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'EMPTY') | |
| error_message: Optional error message if status is 'FAILED' | |
| provider: Optional provider name for successful transcriptions | |
| """ | |
| update_doc = { | |
| "status": status, | |
| "updated_at": datetime.now(UTC) | |
| "transcription_status": status, | |
| "updated_at": datetime.now(UTC).isoformat() | |
| } | |
| if status in ["COMPLETED", "FAILED"]: | |
| update_doc["completed_at"] = datetime.now(UTC) | |
| if error_message: | |
| update_doc["error_message"] = error_message | |
| if result_version_id: | |
| update_doc["result_version_id"] = result_version_id | |
| if status == "COMPLETED": | |
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | |
| if provider: | |
| update_doc["transcription_provider"] = provider | |
| elif status == "FAILED" and error_message: | |
| update_doc["transcription_error"] = error_message | |
| elif status == "EMPTY": | |
| update_doc["transcription_completed_at"] = datetime.now(UTC).isoformat() | |
| if provider: | |
| update_doc["transcription_provider"] = provider | |
| result = await self.col.update_one( | |
| {"run_id": run_id}, | |
| {"$set": update_doc} | |
| {"audio_uuid": audio_uuid}, {"$set": update_doc} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Updated processing run {run_id} status to {status}") | |
| return result.modified_count > 0 |
🧰 Tools
🪛 Ruff (0.13.1)
521-521: Redefinition of unused update_transcription_status from line 387
(F811)
522-522: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
522-522: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/database.py around lines 521 to
551 there is a duplicate definition of update_transcription_status that
overrides the richer earlier implementation and causes loss of provider/error
metadata and a Ruff F811; remove this duplicate method (or merge its logic into
the earlier, versioned implementation) so only the intended, full-featured
update_transcription_status remains, ensuring the final method sets
transcription_status, updated_at, transcription_completed_at when appropriate,
and preserves transcription_provider and transcription_error fields, then run
tests/lint to confirm the duplicate is gone and F811 is resolved.
| async def get_user_by_client_id(client_id: str) -> Optional[User]: | ||
| """Find the user that owns a specific client_id.""" | ||
| return await User.find_one({"registered_clients.client_id": client_id}) | ||
|
|
||
|
|
||
| async def register_client_to_user( | ||
| user: User, client_id: str, device_name: Optional[str] = None | ||
| ) -> None: | ||
| """Register a client to a user and save to database.""" |
There was a problem hiding this comment.
Fix client lookup query
registered_clients is stored as a dict keyed by client_id, so the path registered_clients.client_id never exists. As written, this query will always return None, breaking every caller that tries to resolve a user by client ID—most notably the audio/session flows that rely on this lookup. Please use the dynamic key when querying Mongo so the document can actually be found.
- return await User.find_one({"registered_clients.client_id": client_id})
+ return await User.find_one(
+ {f"registered_clients.{client_id}.client_id": client_id}
+ )🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/models/user.py around lines 113 to
121, the query User.find_one({"registered_clients.client_id": client_id}) is
incorrect because registered_clients is a dict keyed by client_id; change the
query to use the dynamic key path for Mongo (e.g. build the field name as
"registered_clients.{client_id}" and query for its existence or value) so the
lookup actually matches documents; ensure you construct the query string with
the client_id inserted (and handle any necessary escaping) and use
{"registered_clients.<client_id>": {"$exists": True}} or equivalent to return
the owning user.
| # Filter jobs by user if not admin | ||
| if not current_user.is_superuser: | ||
| # Filter based on user_id in job kwargs (where RQ stores job parameters) | ||
| user_jobs = [] | ||
| for job in result["jobs"]: | ||
| job_kwargs = job.get("kwargs", {}) | ||
| if job_kwargs.get("user_id") == str(current_user.user_id): | ||
| user_jobs.append(job) | ||
|
|
||
| result["jobs"] = user_jobs | ||
| result["pagination"]["total"] = len(user_jobs) | ||
|
|
There was a problem hiding this comment.
Filtered pagination hides the pager for regular users
Shrinking result["jobs"] after pagination is fine, but overwriting result["pagination"]["total"] with len(user_jobs) means the UI now sees total <= limit and removes the pager. Non-admins can no longer advance to later pages even if they have more jobs queued. Please filter by user before paginating (e.g., add a user_id filter inside get_jobs) or otherwise return the correct total/has_more values for that user so the client can continue paging.
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
around lines 31 to 42, the code filters result["jobs"] after pagination and then
sets result["pagination"]["total"] = len(user_jobs), which hides the pager for
regular users; fix by applying the user filter before calling the
pagination/get_jobs routine (e.g., pass user_id into get_jobs or query to only
return jobs for that user), or if that is not possible compute the correct total
and has_more for the filtered set (calculate total matching jobs across all
pages or set pagination["has_more"] appropriately) so the returned pagination
reflects the true count/availability for the current user rather than the
post-filtered slice.
| # Check if audio file actually exists | ||
| import os | ||
| file_exists = os.path.exists(full_audio_path) | ||
| logger.info(f"🎤 DEBUG: Audio path from DB: {audio_path}") | ||
| logger.info(f"🎤 DEBUG: Full audio path: {full_audio_path}") | ||
| logger.info(f"🎤 DEBUG: Audio file exists: {file_exists}") | ||
|
|
||
| logger.info( | ||
| f"🎤 Getting speaker diarization with word matching for: {full_audio_path}" | ||
| ) |
There was a problem hiding this comment.
Skip speaker call when file missing.
You log file_exists but still call the service. Bail out early to avoid pointless network calls and confusing errors.
- file_exists = os.path.exists(full_audio_path)
+ file_exists = os.path.exists(full_audio_path)
logger.info(f"🎤 DEBUG: Audio path from DB: {audio_path}")
logger.info(f"🎤 DEBUG: Full audio path: {full_audio_path}")
logger.info(f"🎤 DEBUG: Audio file exists: {file_exists}")
+ if not file_exists:
+ logger.warning(f"🎤 Skipping speaker recognition; file not found: {full_audio_path}")
+ speaker_result = None
+ # fall through to single-segment fallback
+ pass
+ else:
+ logger.info(
+ f"🎤 Getting speaker diarization with word matching for: {full_audio_path}"
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Check if audio file actually exists | |
| import os | |
| file_exists = os.path.exists(full_audio_path) | |
| logger.info(f"🎤 DEBUG: Audio path from DB: {audio_path}") | |
| logger.info(f"🎤 DEBUG: Full audio path: {full_audio_path}") | |
| logger.info(f"🎤 DEBUG: Audio file exists: {file_exists}") | |
| logger.info( | |
| f"🎤 Getting speaker diarization with word matching for: {full_audio_path}" | |
| ) | |
| # Check if audio file actually exists | |
| import os | |
| file_exists = os.path.exists(full_audio_path) | |
| logger.info(f"🎤 DEBUG: Audio path from DB: {audio_path}") | |
| logger.info(f"🎤 DEBUG: Full audio path: {full_audio_path}") | |
| logger.info(f"🎤 DEBUG: Audio file exists: {file_exists}") | |
| if not file_exists: | |
| logger.warning( | |
| f"🎤 Skipping speaker recognition; file not found: {full_audio_path}" | |
| ) | |
| speaker_result = None | |
| # fall through to single-segment fallback | |
| pass | |
| else: | |
| logger.info( | |
| f"🎤 Getting speaker diarization with word matching for: {full_audio_path}" | |
| ) |
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/transcription.py around lines 400
to 409, you log whether the audio file exists but still proceed to call the
speaker-diarization service; change this to bail out early when the file is
missing: after computing file_exists, if not file_exists log a warning/error
including the full_audio_path and return a sensible default (e.g., None or an
empty diarization result consistent with the function's contract) so you avoid
the external network call and confusing downstream errors.
| IF '${user_id}' != '${None}' | ||
| Set To Dictionary ${params} user_id=${user_id} | ||
| END |
There was a problem hiding this comment.
Simplify the None check; current string-comparison is brittle.
Use truthiness or explicit None comparison; don’t quote variables in the IF.
- IF '${user_id}' != '${None}'
- Set To Dictionary ${params} user_id=${user_id}
- END
+ IF $user_id
+ Set To Dictionary ${params} user_id=${user_id}
+ ENDAlternatively: IF $user_id is not None.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In tests/resources/memory_keywords.robot around lines 15 to 17, the IF currently
compares the user_id to the string '${None}', which is brittle; change the
condition to use truthiness or an explicit None comparison and do not quote the
variable — for example use "IF ${user_id}" to run the block only when user_id
is truthy, or "IF ${user_id} is not None" for an explicit None check, leaving
the Set To Dictionary and END lines unchanged.
| ${response}= Get User Memories ${token} 1000 | ||
| Should Be Equal As Integers ${response.status_code} 200 | ||
| ${memories}= Set Variable ${response.json()} | ||
| ${count}= Get Length ${memories} | ||
| RETURN ${count} No newline at end of file |
There was a problem hiding this comment.
Parse JSON via Evaluate/RequestsLibrary, not ${response.json()} literal.
${response.json()} won’t call the method in RF. Use Evaluate or To JSON.
- ${memories}= Set Variable ${response.json()}
+ ${memories}= Evaluate $response.json()If avoiding Evaluate:
${memories}= To JSON ${response.content}🤖 Prompt for AI Agents
In tests/resources/memory_keywords.robot around lines 70 to 74, the test calls
${response.json()} which is a Python method and won't be executed by Robot
Framework; replace that usage by parsing the response body with Robot keywords
(either use Evaluate to call response.json() e.g. Evaluate response.json()
modules=requests or use To JSON ${response.content}) and then assign the
parsed JSON to ${memories} so Get Length operates on the actual list/dict.
| &{request_data}= Create Dictionary | ||
| ... model=${OPENAI_MODEL} | ||
| ... messages=${[{"role": "user", "content": "${prompt}"}]} | ||
| ... max_tokens=10 | ||
| ... temperature=0.1 | ||
|
|
There was a problem hiding this comment.
Fix OpenAI payload serialization.
The messages field is being passed as a plain string, so the chat-completions call never receives a real list of role/content objects. The API will reject this payload at runtime. Please build the messages array as a proper list of dictionaries before adding it to request_data.
- &{request_data}= Create Dictionary
- ... model=${OPENAI_MODEL}
- ... messages=${[{"role": "user", "content": "${prompt}"}]}
- ... max_tokens=10
- ... temperature=0.1
+ &{user_message}= Create Dictionary role=user content=${prompt}
+ ${messages}= Create List ${user_message}
+ &{request_data}= Create Dictionary
+ ... model=${OPENAI_MODEL}
+ ... messages=${messages}
+ ... max_tokens=10
+ ... temperature=0.1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| &{request_data}= Create Dictionary | |
| ... model=${OPENAI_MODEL} | |
| ... messages=${[{"role": "user", "content": "${prompt}"}]} | |
| ... max_tokens=10 | |
| ... temperature=0.1 | |
| &{user_message}= Create Dictionary role=user content=${prompt} | |
| ${messages}= Create List ${user_message} | |
| &{request_data}= Create Dictionary | |
| ... model=${OPENAI_MODEL} | |
| ... messages=${messages} | |
| ... max_tokens=10 | |
| ... temperature=0.1 |
🤖 Prompt for AI Agents
In tests/resources/transcript_verification.robot around lines 70–75, the
messages field is being added to request_data as a plain string so the OpenAI
chat-completions call never receives a real list of role/content objects;
replace that by constructing messages as an actual Robot Framework list of
dictionaries (use Create Dictionary for each message with keys role and content,
then Create List to aggregate them) and assign that list into request_data (or
use Set To Dictionary to set the messages key) so the payload contains a real
array of {role, content} objects rather than a serialized string.
| Delete All Sessions openai | ||
| RETURN ${similarity_score} | ||
| EXCEPT | ||
| Delete All Sessions openai | ||
| Fail Invalid similarity score from OpenAI: ${score_text} |
There was a problem hiding this comment.
Use the correct RequestsLibrary keyword for cleanup.
Delete All Sessions takes no arguments; passing openai will raise a keyword argument error and abort the test. Switch to Delete Session openai (or call Delete All Sessions without arguments) so the session teardown executes reliably.
- Delete All Sessions openai
+ Delete Session openai
@@
- Delete All Sessions openai
+ Delete Session openai📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Delete All Sessions openai | |
| RETURN ${similarity_score} | |
| EXCEPT | |
| Delete All Sessions openai | |
| Fail Invalid similarity score from OpenAI: ${score_text} | |
| Delete Session openai | |
| RETURN ${similarity_score} | |
| EXCEPT | |
| Delete Session openai | |
| Fail Invalid similarity score from OpenAI: ${score_text} |
🤖 Prompt for AI Agents
In tests/resources/transcript_verification.robot around lines 87 to 91, the
teardown uses the RequestsLibrary keyword incorrectly by calling "Delete All
Sessions openai" (which accepts no arguments) causing a keyword argument
error; change that call to "Delete Session openai" (or alternatively call
"Delete All Sessions" with no arguments) in both the normal and EXCEPT branches
so the session cleanup runs without raising an argument error.
| ${error_patterns}= Create List [inaudible] [unclear] *** ERROR FAILED | ||
| FOR ${pattern} IN @{error_patterns} | ||
| Should Not Contain ${transcript_lower} ${pattern} Transcript contains error pattern: ${pattern} | ||
| END | ||
|
|
There was a problem hiding this comment.
Normalize error patterns before comparison.
You lowercase the transcript but never lowercase the patterns, so values like ERROR/FAILED slip through undetected. Lowercase the pattern inside the loop before performing Should Not Contain to keep the heuristic working.
${error_patterns}= Create List [inaudible] [unclear] *** ERROR FAILED
FOR ${pattern} IN @{error_patterns}
- Should Not Contain ${transcript_lower} ${pattern} Transcript contains error pattern: ${pattern}
+ ${pattern_lower}= Convert To Lower Case ${pattern}
+ Should Not Contain ${transcript_lower} ${pattern_lower} Transcript contains error pattern: ${pattern}
END📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ${error_patterns}= Create List [inaudible] [unclear] *** ERROR FAILED | |
| FOR ${pattern} IN @{error_patterns} | |
| Should Not Contain ${transcript_lower} ${pattern} Transcript contains error pattern: ${pattern} | |
| END | |
| ${error_patterns}= Create List [inaudible] [unclear] *** ERROR FAILED | |
| FOR ${pattern} IN @{error_patterns} | |
| ${pattern_lower}= Convert To Lower Case ${pattern} | |
| Should Not Contain ${transcript_lower} ${pattern_lower} Transcript contains error pattern: ${pattern} | |
| END |
🤖 Prompt for AI Agents
In tests/resources/transcript_verification.robot around lines 132 to 136, the
transcript is lowercased but the error patterns are not, allowing matches like
"ERROR"/"FAILED" to bypass the check; fix by normalizing each pattern to
lowercase inside the FOR loop (e.g., compute a lowercase version of ${pattern}
and use that in the Should Not Contain assertion) so comparisons are
case-insensitive.
refactored to use a service model for transcription, and got it working with rq
Declutterd main, moved all processors to RQ. Streaming and batch mode enabled on audio recording
streaming segments should now be able to identify the speaker. Removed a bunch of dead code
Tidied up dead code, added timeline to job page so we can see time taken, got speaker recognition working on conversation trigger
created models for
Make them use beanie pattern to remove complex raw database saving in database.py.
Move to using RQ for queued jobs, in order to get decent restart protection if server dies, and reduce complexity of code.
Did not move the audio chunking part or speech detection.
This is not a final PR, I need to check over the code and ensure we have tests and they work. There's still a bunch of managers and providers that I think are confusing, but I feel it's less complex for a lot of it, although the whole audio streaming / audio cropping / transctripiotn / creation of converastion still feels comples
Summary by CodeRabbit
New Features
Improvements
DevOps
Tests
Bug Fixes