Conversation
test framework streams faster than realtime and so we need this to wait
- Runs Robot Framework test suite on all branch pushes - Triggers on changes to test files and backend source code - Publishes test results and HTML reports as artifacts - Includes test summary in workflow output
- Create fresh .env file instead of appending to template - Eliminates duplicate key issues with ADMIN_EMAIL, OPENAI_API_KEY, etc. - Add TRANSCRIPTION_PROVIDER configuration - Improve readability with comments
- Create .env.test in tests/setup/ instead of tests/ - Matches what test_env.py expects (Path(__file__).parent / '.env.test') - This was causing test setup failures in CI
- Use docker-compose-test.yml explicitly in all commands - Add better health check loops with progress indicators - Show container status and logs on failures - Increase timeout attempts with clearer error messages - Fix cleanup to use correct compose file - Remove push trigger (PR-only workflow)
- Fix TEST_AUDIO_FILE path: remove duplicate 'tests/' prefix - Add clear instructions for downloading HTML reports from artifacts - Improve summary output formatting
- Update .gitignore to allow tests/test_assets/*.wav files - Add DIY_Experts_Glass_Blowing test audio files (1min and 4min) - Install websockets package for audio_stream_library.py - Add OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL to test step - Required for backend imports when loading test libraries
# Conflicts: # .github/workflows/robot-tests.yml
- env: must come BEFORE run: in GitHub Actions YAML - Previous commit had them in wrong order causing secrets to be empty - Removed unnecessary verification and export steps
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughThis PR adds WebSocket audio streaming client and test helpers, reorganizes and expands Robot Framework test infrastructure and CI workflows (including secret checks), introduces self-healing worker orchestration, changes conversation/audio models and job metadata propagation (client_id/end_reason), adds request logging middleware, and updates many backend routes, utilities, and front-end consumers to the new shapes. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client (Robot/WebSocket)
participant API as Backend API
participant Redis as Redis
participant RQ as RQ Workers
participant DB as MongoDB
participant Storage as File storage
Client->>API: WS connect (token, device_name)
API->>API: validate token (header or query), create stream
API-->>Client: ready
Client->>API: send_audio_start
loop streaming
Client->>API: send_audio_chunk (wyoming/raw)
API->>Storage: write chunk (relative path)
API->>Redis: push stream events
end
Client->>API: send_audio_stop
API->>Redis: mark stream END
Redis->>RQ: audio events consumed -> detect speech
RQ->>API: enqueue open_conversation job (meta includes client_id)
RQ->>DB: create Conversation (audio_path=relative, end_reason=unknown)
RQ->>RQ: enqueue post-conversation jobs (transcribe, speaker, memory) with meta
par post-processing
RQ->>DB: transcribe_full_audio_job -> update transcript_versions
RQ->>DB: speaker_recognition_job -> update segments
RQ->>DB: extract_memories_job -> add memory_version(s)
end
RQ->>DB: handle_end_of_conversation(end_reason)
DB-->>RQ: persist end_reason, completed_at
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 14
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
backends/advanced/src/advanced_omi_backend/models/conversation.py (1)
350-358: Factory function contains fields that don't exist in the model or conflict with computed fields.The
conv_datadict includes:
transcriptandsegments(lines 350-351) - these are now computed fields, not stored fieldsmemories(line 356) - not defined in theConversationmodelmemory_count(line 357) - conflicts with the computed field at line 207These will either be silently ignored by Pydantic or cause validation errors.
Consider updating the factory function to remove obsolete fields:
conv_data = { "audio_uuid": audio_uuid, "user_id": user_id, "client_id": client_id, "created_at": datetime.now(), "title": title, "summary": summary, - "transcript": transcript or "", - "segments": segments or [], "transcript_versions": [], "active_transcript_version": None, "memory_versions": [], "active_memory_version": None, - "memories": [], - "memory_count": 0 }tests/endpoints/rq_queue_tests.robot (1)
89-92: Same weak assertion pattern.
Should Be True ${jobs_count_after} >= 0always passes. This doesn't validate that the queue survived the restart with any jobs.backends/advanced/src/advanced_omi_backend/services/audio_stream/consumer.py (1)
323-340: Releasing the stream lock on END without removing it fromactive_streamsbreaks exclusivityAfter an
ENDchunk,process_messagecallsawait self.release_stream(stream_name), which drops the lock and removes the entry fromstream_locks, butstream_nameremains inself.active_streams. This has two consequences:
- Another worker can now acquire
consumer:lock:{stream_name}while this consumer continues to callxreadgroupon the same stream, so you can end up with multiple consumers on a stream despite the “exclusive processing” contract.- Since there’s no corresponding
stream_locksentry,renew_stream_locksno longer covers that stream, so the lock concept is effectively disabled for it.If the intent is to fully release the stream to other workers after END, it would be safer to also remove it from
active_streamsso this consumer stops reading from it:# Clean up session buffer del self.session_buffers[session_id] - # Release the consumer lock for this stream - await self.release_stream(stream_name) + # Release the consumer lock and stop consuming this stream + await self.release_stream(stream_name) + if stream_name in self.active_streams: + del self.active_streams[stream_name] + logger.info( + f"➡️ [{self.consumer_name}] Stopped consuming {stream_name} after END; " + f"{len(self.active_streams)} streams remaining" + )Alternatively, if you want the same consumer to keep exclusive ownership across sessions on that stream, you probably don’t want to release the lock on END at all.
Also applies to: 422-424
extras/speaker-recognition/run-test.sh (1)
61-99: Harden CI branch against unsetDEEPGRAM_API_KEYunderset -uThe
${HF_TOKEN:-}guard on the.env-loading branch is good, but withset -euo pipefailthere are a couple of edge cases where an unsetDEEPGRAM_API_KEYwill cause an unhelpful “unbound variable” failure instead of your friendly error messages:
- In the CI-style branch (
elif [ -n "${HF_TOKEN:-}" ]; then), the here‑doc writesDEEPGRAM_API_KEY=$DEEPGRAM_API_KEY, which expands an unset variable underset -u.- The later check
if [ -z "$DEEPGRAM_API_KEY" ]; thenalso expands an unset variable.You can make this more robust while preserving behavior by guarding on both vars and using
${VAR:-}where unset is possible, e.g.:-if [ -f ".env" ] && [ -z "${HF_TOKEN:-}" ]; then +if [ -f ".env" ] && [ -z "${HF_TOKEN:-}" ]; then print_info "Loading environment variables from .env..." set -a source .env set +a -elif [ -n "${HF_TOKEN:-}" ]; then +elif [ -n "${HF_TOKEN:-}" ] && [ -n "${DEEPGRAM_API_KEY:-}" ]; then print_info "Using environment variables from CI..." @@ - cat > .env << EOF -HF_TOKEN=$HF_TOKEN -DEEPGRAM_API_KEY=$DEEPGRAM_API_KEY + cat > .env << EOF +HF_TOKEN=${HF_TOKEN:-} +DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY:-} @@ -# Verify required environment variables -if [ -z "$HF_TOKEN" ]; then +# Verify required environment variables +if [ -z "${HF_TOKEN:-}" ]; then @@ -if [ -z "$DEEPGRAM_API_KEY" ]; then +if [ -z "${DEEPGRAM_API_KEY:-}" ]; thenThis way, runs where one of the keys is missing will reliably reach your explicit error paths instead of dying early due to
set -u. The earlyexportblock you added is fine as-is since it doesn’t expand the variables.Also applies to: 101-120
tests/resources/chat_keywords.robot (1)
38-47: Documentation inconsistency: still mentions "admin session".Line 39 documentation says "uses admin session" but the keyword actually uses the
apisession (viaCreate Chat Sessionat line 44, which usesapiat line 20). Update the documentation to match the implementation.Create Test Chat Session - [Documentation] Create a test chat session with random title (uses admin session) + [Documentation] Create a test chat session with random title (uses api session) [Arguments] ${title_prefix}=Test Sessiontests/resources/conversation_keywords.robot (1)
77-89:${headers}variable is undefined in Delete and other keywords.Multiple keywords (lines 81, 88, 95, 102, 110, 128) reference
${headers}but this variable is never defined in the file or passed as an argument. This will cause runtime failures.Either remove
headers=${headers}if not needed (since session already has auth), or define the variable:Delete Conversation [Documentation] Delete a conversation [Arguments] ${audio_uuid} - ${response}= DELETE On Session api /api/conversations/${audio_uuid} headers=${headers} + ${response}= DELETE On Session api /api/conversations/${audio_uuid} RETURN ${response.json()}Apply the same fix to lines 88, 95, 102, 110, and 128.
tests/endpoints/chat_tests.robot (1)
60-69: Fix indentation inside FOR loop.The
Dictionary Should Contain Keyassertions (lines 62-65) appear to be incorrectly indented. They should be at the same level as theIFstatement inside the loop.FOR ${session} IN @{sessions} # Verify chat session structure - Dictionary Should Contain Key ${session} session_id - Dictionary Should Contain Key ${session} title - Dictionary Should Contain Key ${session} created_at - Dictionary Should Contain Key ${session} updated_at + Dictionary Should Contain Key ${session} session_id + Dictionary Should Contain Key ${session} title + Dictionary Should Contain Key ${session} created_at + Dictionary Should Contain Key ${session} updated_at IF '${session}[session_id]' == '${test_session}[session_id]' ${found}= Set Variable ${True} END ENDtests/endpoints/memory_tests.robot (1)
99-113: Potential inconsistency in response parsing for pagination test.At Lines 101-102 and 109-110, the pagination test parses
${response.json()}directly and gets its length. However, theGet User Memories Test(Lines 26-27) extracts memories from${response.json()}[memories].If the API returns a consistent response structure with memories nested under a
memorieskey, this pagination test will fail or return incorrect counts. Please verify the expected response format and update accordingly:- ${memories1}= Set Variable ${response1.json()} + ${memories1}= Set Variable ${response1.json()}[memories] ${count1}= Get Length ${memories1}
🟡 Minor comments (18)
tests/README.md-5-61 (1)
5-61: Tighten README around env file path and Makefile usageThe new Quick Start section is very helpful, but a couple of small doc gaps are worth fixing:
- You mention the script will create
tests/setup/.env.test, while the committed env file lives attests/.env.testand uses a slightly different set of keys. Please verify that the documented path and variables (API_URL+ API keys) match whatrun-robot-tests.sh/test_env.pyactually expect so users don’t configure the wrong file.- Under “## Running Tests via Makefile” you introduce the section with “you can use the Makefile directly” but don’t show example commands (e.g.,
make endpoints,make robot-tests). Adding 1–2 concrete examples there would complete the flow.These are minor doc fixes but will make onboarding a lot smoother.
.pre-commit-config.yaml-2-22 (1)
2-22: Confirm pre-commit target paths and consider gating heavy Robot tests
- The local
robot-framework-testshook onstages: [push]will runmake endpointsevery push. That’s great for catching regressions but potentially heavy (Docker, external services, etc.). You might want to:
- Gate it behind an env flag (e.g.,
SKIP_ROBOT_ENDPOINTS=1), or- Keep it as an optional hook that devs opt into, rather than always-on for all pushes.
- The
filespatterns forblack,isort, and the hygiene hooks are^backends/advanced-backend/src/.*, while the backend’s pyproject lives underbackends/advanced/…. If the directory really isbackends/advancedin this repo, these hooks won’t match any files. Worth double-checking the path and updating the regex so formatting and hygiene checks actually run on the advanced backend code.Also applies to: 35-42
tests/setup/setup_keywords.robot-88-88 (1)
88-88: Sameshell=Trueissue withrmcommand.The
rm -rfcommand with multiple path arguments andshell=Truemay not work as expected.- Run Process rm -rf data/test_mongo_data data/test_qdrant_data data/test_audio_chunks cwd=backends/advanced shell=True + Run Process rm -rf data/test_mongo_data data/test_qdrant_data data/test_audio_chunks cwd=backends/advancedbackends/advanced/start-workers.sh-147-152 (1)
147-152:wait -nwill also trigger when the monitor process exits.The
wait -ncommand waits for any child process to exit. Ifmonitor_worker_healthexits (e.g., due to an error in the health check Python script), the script will proceed to kill all workers and exit with code 1, even though workers might be healthy.Consider checking which process exited before triggering full shutdown:
# Wait for any worker process to exit -wait -n +wait -n $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID $RQ_WORKER_4_PID $RQ_WORKER_5_PID $RQ_WORKER_6_PID $AUDIO_PERSISTENCE_WORKER_PID $AUDIO_STREAM_WORKER_PIDThis limits
wait -nto only the worker PIDs, not the monitor.tests/setup/setup_keywords.robot-108-116 (1)
108-116: Inconsistentshell=Trueusage with separate arguments.When using
shell=True, the command should be a single string. Passing separate arguments withshell=Truecan lead to unexpected behavior where only the first argument is treated as the command. This pattern appears multiple times in this file.Either remove
shell=True(recommended for separate args):- Run Process docker compose -f ${compose_file} down -v cwd=${working_dir} shell=True + Run Process docker compose -f ${compose_file} down -v cwd=${working_dir}Or use a single command string:
- Run Process docker compose -f ${compose_file} down -v cwd=${working_dir} shell=True + Run Process docker compose -f ${compose_file} down -v cwd=${working_dir} shell=Truetests/resources/queue_keywords.robot-314-320 (1)
314-320: Missing null check before accessing job status.Same issue as
Check job status:Get Job statuscan return${None}, but line 319 accesses${job}[status]unconditionally. Add a null check first.tests/endpoints/rq_queue_tests.robot-58-66 (1)
58-66: Inconsistent handling of Reprocess Transcript return value.Line 59 assigns the result directly to
${job_id}, but line 111 accesses${reprocess_response}[job_id]from the same keyword. This suggests the keyword returns a dict, not a string. Update for consistency:- ${job_id}= Reprocess Transcript ${conversation_id} + ${reprocess_response}= Reprocess Transcript ${conversation_id} + ${job_id}= Set Variable ${reprocess_response}[job_id]tests/resources/queue_keywords.robot-60-74 (1)
60-74: Null check on wrong level causes runtime error.
Get Job Statuscan return${None}, but line 67 accesses${job}[job_id]before checking if${job}is None. This will cause a runtime exception instead of a clear assertion failure.Fix the null check:
${job}= Get Job status ${job_id} - # If job is None (not found), fail explicitly - Should Not Be Equal ${job}[job_id] ${None} Job with ID ${job_id} not found in queue + # If job is None (not found), fail explicitly + Should Not Be Equal ${job} ${None} Job with ID ${job_id} not found in queue ${actual_status}= Set Variable ${job}[status]tests/endpoints/rq_queue_tests.robot-137-139 (1)
137-139: Weak assertion does not validate persistence.The assertion
Should Be True ${persisted_count} >= 0always passes since the count is initialized at 0. For a meaningful persistence test, require at least some jobs to persist:- Should Be True ${persisted_count} >= 0 + Should Be True ${persisted_count} > 0 msg=Expected at least one job to persist through restartIf jobs completing during restart is expected, consider documenting this explicitly or adjusting the test approach.
tests/integration/websocket_streaming_tests.robot-22-54 (1)
22-54: Missing teardown for first test case.The first test case "Streaming jobs created on stream start" opens an audio stream but does not close it. Other test cases include
[Teardown] Close Audio Stream ${stream_id}. Add a teardown to ensure proper resource cleanup.${speech_after}= Find Job For Client ${speech_jobs_after} ${device_name} Should Not Be Equal ${speech_after} ${None} Speech detection job disappeared during streaming + + [Teardown] Close Audio Stream ${stream_id}tests/run-robot-tests.sh-246-260 (1)
246-260: Hardcoded path in Python script doesn't use OUTPUTDIR variable.The embedded Python script uses hardcoded
'results/output.xml'instead of the$OUTPUTDIRvariable, which could cause the summary to fail if a custom output directory is used.if [ -f "$OUTPUTDIR/output.xml" ]; then print_info "Test Results Summary:" - python3 << 'PYTHON_SCRIPT' + python3 << PYTHON_SCRIPT import xml.etree.ElementTree as ET -tree = ET.parse('results/output.xml') +tree = ET.parse('$OUTPUTDIR/output.xml') root = tree.getroot()Note: Remove the single quotes around
PYTHON_SCRIPTto enable variable substitution.backends/advanced/start-k8s.sh-146-164 (1)
146-164:check_worker_healthmay count workers from other pods.The function counts all workers in Redis via
Worker.all(), but unlike the cleanup logic (lines 68-75) which filters by hostname, this counts workers across all pods. This could mask local worker failures if other pods have healthy workers.Consider filtering by hostname for consistency:
check_worker_health() { WORKER_COUNT=$(python3 -c " from rq import Worker from redis import Redis import os import sys +import socket try: redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') r = Redis.from_url(redis_url) workers = Worker.all(connection=r) - print(len(workers)) + hostname = socket.gethostname() + local_workers = [w for w in workers if hostname in w.name] + print(len(local_workers)) except Exception as e:tests/endpoints/audio_upload_tests.robot-48-51 (1)
48-51: Audio path assertion logic may be incorrect.Line 50 asserts that
audio_pathshould not contain/, but line 77 expects paths starting withfixtures/which does contain/. These assertions are inconsistent for the "no folder" vs "with folder" cases.The assertion on line 50 should be more specific:
- Should Not Contain ${conversation}[audio_path] / msg=audio_path should be relative, not absolute + # For uploads without folder, audio_path should be just filename (no directory separators) + ${path_parts}= Split String ${conversation}[audio_path] / + ${part_count}= Get Length ${path_parts} + Should Be Equal As Integers ${part_count} 1 msg=audio_path without folder should not contain /backends/advanced/start-k8s.sh-178-193 (1)
178-193: Numeric comparison may fail ifWORKER_COUNTis empty or non-numeric.If
check_worker_healthreturns an empty string or error text, the comparison[ "$WORKER_COUNT" -lt "$MIN_WORKERS" ]will fail with a syntax error.Add validation for the worker count:
WORKER_COUNT=$(check_worker_health) + + # Validate WORKER_COUNT is numeric + if ! [[ "$WORKER_COUNT" =~ ^[0-9]+$ ]]; then + echo "⚠️ Self-healing: Invalid worker count '$WORKER_COUNT', assuming 0" + WORKER_COUNT=0 + fi if [ "$WORKER_COUNT" -lt "$MIN_WORKERS" ]; thenCommittable suggestion skipped: line range outside the PR's diff.
tests/endpoints/system_admin_tests.robot-119-126 (1)
119-126: Inconsistent response format handling for/api/admin/memory/config/rawendpoint.At Lines 111-113 (
Get Memory Config Raw Test), the response is treated as text:${config}= Set Variable ${response.text}. However, here at Lines 122-123, the same endpoint's response is treated as JSON with aconfig_yamlfield.This inconsistency will cause one of these tests to fail. Please verify the actual response format of the endpoint and update both tests accordingly.
tests/resources/session_keywords.robot-154-157 (1)
154-157: MissingStringlibrary import forGenerate Random Stringkeyword.The
Get Random IDkeyword usesGenerate Random Stringwhich requires theStringlibrary, but it's not imported in the Settings section.Library RequestsLibrary Library Collections +Library String Variables ../setup/test_env.pyCommittable suggestion skipped: line range outside the PR's diff.
tests/setup/test_manager_keywords.robot-58-59 (1)
58-59: IncorrectRun Processusage for curl commands.When using positional arguments with
Run Process, theshell=Trueargument is incorrect and may cause unexpected behavior. The command is passed as separate arguments, not as a shell string.Apply this diff to fix the curl invocations:
- Run Process curl -s -X DELETE http://localhost:6337/collections/memories shell=True - Run Process curl -s -X DELETE http://localhost:6337/collections/conversations shell=True + Run Process curl -s -X DELETE http://localhost:6337/collections/memories + Run Process curl -s -X DELETE http://localhost:6337/collections/conversationsAlternatively, if shell execution is intended:
- Run Process curl -s -X DELETE http://localhost:6337/collections/memories shell=True - Run Process curl -s -X DELETE http://localhost:6337/collections/conversations shell=True + Run Process curl -s -X DELETE http://localhost:6337/collections/memories shell=True + Run Process curl -s -X DELETE http://localhost:6337/collections/conversations shell=Truebackends/advanced/webui/src/pages/Conversations.tsx-333-344 (1)
333-344: Token in URL query string may be logged.The authentication token is included in the audio URL query string (line 334) and the error handler logs the full
audio.src(line 343), which includes the token. Tokens in URLs can appear in browser history, server access logs, and proxy logs.Consider:
- Using a short-lived token for audio requests specifically
- Redacting the token in error logs:
audio.src.split('token=')[0]
| # Calculate chunk size | ||
| bytes_per_sample = sample_width * channels | ||
| samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Remove unused variable bytes_per_sample.
The variable is calculated but never used. Based on static analysis hint (F841).
# Calculate chunk size
- bytes_per_sample = sample_width * channels
samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Calculate chunk size | |
| bytes_per_sample = sample_width * channels | |
| samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) | |
| # Calculate chunk size | |
| samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) |
🧰 Tools
🪛 Ruff (0.14.7)
260-260: Local variable bytes_per_sample is assigned to but never used
Remove assignment to unused variable bytes_per_sample
(F841)
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py
around lines 259 to 261, the variable bytes_per_sample is computed but never
used; remove the bytes_per_sample assignment (and any related dead code) and, if
needed for future calculations, replace uses with the correct expression
(sample_width * channels) inline where actually required so the unused-variable
warning is eliminated.
| # For non-streaming responses, try to extract and log JSON body | ||
| if should_log_body and response.status_code != 204: # No content | ||
| try: | ||
| # Read response body | ||
| response_body = b"" | ||
| async for chunk in response.body_iterator: | ||
| response_body += chunk | ||
|
|
||
| # Try to parse as JSON for pretty printing | ||
| try: | ||
| json_body = json.loads(response_body) | ||
| formatted_json = json.dumps(json_body, indent=2) | ||
| request_logger.info( | ||
| f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms\n" | ||
| f"Response body:\n{formatted_json}" | ||
| ) | ||
| except (json.JSONDecodeError, UnicodeDecodeError): | ||
| # Not JSON or not UTF-8, just log the status | ||
| request_logger.info( | ||
| f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms " | ||
| f"(non-JSON response)" | ||
| ) | ||
|
|
||
| # Recreate response with the body we consumed | ||
| from starlette.responses import Response | ||
| return Response( | ||
| content=response_body, | ||
| status_code=response.status_code, | ||
| headers=dict(response.headers), | ||
| media_type=response.media_type, | ||
| ) | ||
| except Exception as e: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "app_middleware.py" | head -20Repository: chronicler-ai/chronicle
Length of output: 190
🏁 Script executed:
# Get the repository structure
git ls-files | grep -E "(middleware|app_middleware)" | head -20Repository: chronicler-ai/chronicle
Length of output: 190
🏁 Script executed:
# Search for the file
fd "app_middleware.py" -type fRepository: chronicler-ai/chronicle
Length of output: 190
🌐 Web query:
Starlette Response.body property set assign
💡 Result:
Yes — Response.body is just the bytes (or memoryview) used as the response payload and you can set it either by passing content when creating the Response or by assigning to response.body directly. Example:
-
create with content:
from starlette.responses import Response
resp = Response(b'hello', media_type='text/plain') -
assign afterward:
resp.body = b'new body'
If you change body after the Response was created, update the Content-Length header to match (e.g. resp.headers['content-length'] = str(len(resp.body'))) so the sent headers stay correct. [1][2]
Sources: Starlette Responses docs (Response.render / body) and middleware note about updating Content-Length. [1] [2]
🌐 Web query:
Starlette middleware read response body restore
💡 Result:
Short answer + example.
You must consume the response's body_iterator into bytes/chunks, then replace response.body_iterator with a new iterator (e.g. via starlette.concurrency.iterate_in_threadpool) or return a new Response built from the captured bytes. Also remember to preserve/update headers (Content-Length) and status. This follows the Starlette middleware pattern of wrapping send / reading http.response.body events. [1]
Minimal example (http middleware style):
from starlette.concurrency import iterate_in_threadpool
from starlette.requests import Request
from starlette.responses import Response
@app.middleware("http")
async def capture_response_middleware(request: Request, call_next):
response = await call_next(request)
# read all chunks (consumes the original iterator)
body_chunks = [chunk async for chunk in response.body_iterator]
body = b"".join(body_chunks)
# inspect / log / modify body
print(body.decode(errors="ignore"))
# restore response so downstream/ASGI can still send it
response.body_iterator = iterate_in_threadpool(iter(body_chunks))
# update Content-Length if present
if "content-length" in response.headers:
response.headers["content-length"] = str(len(body))
return response
Notes:
- Streaming responses and very large bodies: buffering reads whole body into memory — avoid for huge streams. [1][2]
- Alternative: build and return a new Response(Response(content=body, status_code=..., headers=...)). [2][3]
Sources: Starlette middleware docs; examples showing reading + restoring body with iterate_in_threadpool and rebuilding Response. [1][2][3]
References
[1] Starlette middleware docs — inspecting/modifying response.
[2] Example approach using iterate_in_threadpool to reset body_iterator.
[3] StackOverflow: capturing & restoring response body in FastAPI/Starlette middleware.
🌐 Web query:
Starlette Response background tasks preservation
💡 Result:
Short answer: attach background work to the Response (response.background) using starlette.background.BackgroundTask or BackgroundTasks — and if you need to preserve existing tasks, detect/merge them rather than overwrite. Note: background tasks run after the response is sent, can be cancelled on client disconnect or in some serverless environments, and are not a reliable job queue for long-running work. [1][2][3]
Minimal example (merge existing task(s)):
- If no existing background:
response.background = BackgroundTask(func, *args) - If existing is a single BackgroundTask -> convert to BackgroundTasks and add both:
from starlette.background import BackgroundTask, BackgroundTasks
existing = response.background
tasks = BackgroundTasks()
if isinstance(existing, BackgroundTasks):
tasks = existing
elif existing is not None:
# existing is BackgroundTask
tasks.add_task(existing.func, *getattr(existing, "args", ()), **getattr(existing, "kwargs", {}))
tasks.add_task(func, *args, **kwargs)
response.background = tasks
References:
- Starlette Background Tasks docs. [1]
- Examples / advice for adding tasks in middleware / exception handlers (StackOverflow). [2]
- Known caveat: tasks may be cancelled if client disconnects / certain server setups (GitHub issue). [3]
Avoid rebuilding the response; restore the consumed body correctly
In dispatch, after reading response.body_iterator, the code creates a brand-new Response:
from starlette.responses import Response
return Response(
content=response_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)This drops critical metadata:
- Any attached
backgroundtasks (e.g., FastAPIBackgroundTasks) will never run. - Any custom response subclass behavior is lost.
- Converting
response.headerstodictmay collapse duplicate headers like multipleSet-Cookievalues.
To preserve the original response while logging, restore the consumed body onto it using Starlette's recommended pattern:
from starlette.concurrency import iterate_in_threadpool
# After logging, restore the body iterator
response.body_iterator = iterate_in_threadpool(iter([response_body]))
# Update Content-Length to match the new body
if "content-length" in response.headers:
response.headers["content-length"] = str(len(response_body))
return responseThis keeps the original response object intact, preserving background tasks, headers, and any custom behavior, while ensuring clients receive the correct Content-Length.
🧰 Tools
🪛 Ruff (0.14.7)
167-167: Do not catch blind exception: Exception
(BLE001)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
Show resolved
Hide resolved
| Tuple of (relative_audio_path, absolute_file_path, duration) | ||
| - relative_audio_path: Path for database storage (e.g., "fixtures/123_abc_uuid.wav" or "123_abc_uuid.wav") | ||
| - absolute_file_path: Full filesystem path for immediate file operations | ||
| - duration: Audio duration in seconds | ||
|
|
There was a problem hiding this comment.
write_audio_file returns a relative path but still stores only the filename in AudioFile.audio_path
The updated write_audio_file computes a relative_audio_path (relative to CHUNK_DIR when possible) and documents the return value as:
Tuple of (relative_audio_path, absolute_file_path, duration)
– relative_audio_path: Path for database storage …
However, the AudioFile document is still created with audio_path=wav_filename, ignoring the new relative path. This creates a mismatch between:
- What the function advertises as the DB storage path, and
- What actually ends up in the DB.
For cases where output_dir is a subdirectory of CHUNK_DIR (e.g., fixtures), downstream code that expects audio_path to include the folder prefix may not be able to resolve the file correctly.
If the intent is to standardize on the relative path in the DB (which matches the docstring and PR summary), you likely want:
- # Create AudioFile database entry using Beanie model
- audio_file = AudioFile(
- audio_uuid=audio_uuid,
- audio_path=wav_filename,
+ # Create AudioFile database entry using Beanie model
+ audio_file = AudioFile(
+ audio_uuid=audio_uuid,
+ audio_path=relative_audio_path,
client_id=client_id,
timestamp=timestamp,
user_id=user_id,
user_email=user_email,
has_speech=False, # Will be updated by transcription
speech_analysis={}
)
@@
- audio_logger.info(
- f"✅ Wrote audio file: {wav_filename} ({len(audio_data)} bytes, {duration:.1f}s)"
- )
+ audio_logger.info(
+ f"✅ Wrote audio file: {relative_audio_path} ({len(audio_data)} bytes, {duration:.1f}s)"
+ )
@@
- return relative_audio_path, str(file_path), duration
+ return relative_audio_path, str(file_path), durationIf instead you intentionally want DB audio_path to remain a bare filename and use the returned relative_audio_path elsewhere, consider updating the docstring to make that explicit so callers don’t rely on misleading semantics.
Also applies to: 159-174, 197-213
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/utils/audio_utils.py around lines
129-133 (and also apply same fix at 159-174 and 197-213): the function computes
and documents a relative_audio_path for DB storage but still sets
AudioFile.audio_path to just wav_filename, causing DB/stored-path mismatch;
update the code to set AudioFile.audio_path to the computed relative_audio_path
(not the bare filename) when creating/saving the AudioFile document, and ensure
any callers that expect a bare filename are adjusted or the docstring updated
consistently so the DB stores the relative path as documented.
| Get Cropped Audio Info | ||
| [Documentation] Get cropped audio information for a conversation | ||
| [Arguments] ${audio_uuid} | ||
|
|
||
| ${response}= GET On Session api /api/conversations/${audio_uuid}/cropped headers=${headers} | ||
| RETURN ${response.json()}[cropped_audios] |
There was a problem hiding this comment.
Undefined ${headers} variable will cause runtime error.
The Get Cropped Audio Info keyword references ${headers} which is not defined in this file or imported from resources. This will fail at runtime.
Get Cropped Audio Info
[Documentation] Get cropped audio information for a conversation
[Arguments] ${audio_uuid}
- ${response}= GET On Session api /api/conversations/${audio_uuid}/cropped headers=${headers}
+ ${response}= GET On Session api /api/conversations/${audio_uuid}/cropped
RETURN ${response.json()}[cropped_audios] If custom headers are needed, either define them in this keyword or import them from a resource file.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Get Cropped Audio Info | |
| [Documentation] Get cropped audio information for a conversation | |
| [Arguments] ${audio_uuid} | |
| ${response}= GET On Session api /api/conversations/${audio_uuid}/cropped headers=${headers} | |
| RETURN ${response.json()}[cropped_audios] | |
| Get Cropped Audio Info | |
| [Documentation] Get cropped audio information for a conversation | |
| [Arguments] ${audio_uuid} | |
| ${response}= GET On Session api /api/conversations/${audio_uuid}/cropped | |
| RETURN ${response.json()}[cropped_audios] |
🤖 Prompt for AI Agents
In tests/resources/audio_keywords.robot around lines 72 to 77, the keyword Get
Cropped Audio Info uses an undefined ${headers} variable which will raise a
runtime error; fix by either (a) defining ${headers} within this file (e.g., add
a Variables section or set the headers variable at top of the file) or (b)
import the resource file that declares ${headers} and ensure the resource is
listed in Settings, or (c) remove the headers argument from the GET call if no
custom headers are required; update the keyword accordingly so the GET On
Session call receives a valid headers value.
| Get Job Status | ||
| [Documentation] Get just the status of a specific job by ID (lightweight endpoint) | ||
| [Arguments] ${job_id} | ||
|
|
||
| # Use the lightweight status endpoint - try to get the response | ||
| ${success}= Run Keyword And Return Status GET On Session api /api/queue/jobs/${job_id}/status expected_status=200 | ||
|
|
||
| IF not ${success} | ||
| # Job not found | ||
| RETURN ${None} | ||
| END | ||
|
|
||
| # Now actually get the response | ||
| ${response}= GET On Session api /api/queue/jobs/${job_id}/status | ||
| RETURN ${response.json()} |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Duplicate API call is inefficient and risks race conditions.
The keyword makes two identical API calls: first via Run Keyword And Return Status to check success, then again on line 56 to get the response. This is inefficient and could return different data if the job status changes between calls.
Refactor to make a single call:
Get Job Status
[Documentation] Get just the status of a specific job by ID (lightweight endpoint)
[Arguments] ${job_id}
- # Use the lightweight status endpoint - try to get the response
- ${success}= Run Keyword And Return Status GET On Session api /api/queue/jobs/${job_id}/status expected_status=200
-
- IF not ${success}
- # Job not found
+ ${response}= GET On Session api /api/queue/jobs/${job_id}/status expected_status=any
+ IF ${response.status_code} != 200
RETURN ${None}
END
-
- # Now actually get the response
- ${response}= GET On Session api /api/queue/jobs/${job_id}/status
RETURN ${response.json()}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Get Job Status | |
| [Documentation] Get just the status of a specific job by ID (lightweight endpoint) | |
| [Arguments] ${job_id} | |
| # Use the lightweight status endpoint - try to get the response | |
| ${success}= Run Keyword And Return Status GET On Session api /api/queue/jobs/${job_id}/status expected_status=200 | |
| IF not ${success} | |
| # Job not found | |
| RETURN ${None} | |
| END | |
| # Now actually get the response | |
| ${response}= GET On Session api /api/queue/jobs/${job_id}/status | |
| RETURN ${response.json()} | |
| Get Job Status | |
| [Documentation] Get just the status of a specific job by ID (lightweight endpoint) | |
| [Arguments] ${job_id} | |
| ${response}= GET On Session api /api/queue/jobs/${job_id}/status expected_status=any | |
| IF ${response.status_code} != 200 | |
| # Job not found | |
| RETURN ${None} | |
| END | |
| RETURN ${response.json()} |
🤖 Prompt for AI Agents
In tests/resources/queue_keywords.robot around lines 43 to 57, the keyword
currently issues two identical GET calls (one with Run Keyword And Return Status
and then another real GET) which is inefficient and unsafe; replace them with a
single call using "Run Keyword And Ignore Error GET On Session api
/api/queue/jobs/${job_id}/status expected_status=200", capture the two
returned values (status and response_or_error), check if status is "FAIL" and
RETURN ${None} in that case, otherwise parse and RETURN the
response_or_error.json(); this removes the duplicate request and avoids race
conditions.
| Verify Transcript Content | ||
| [Documentation] Verify transcript contains expected content and quality | ||
| [Arguments] ${conversation} ${expected_keywords} ${min_length}=50 | ||
|
|
||
| Dictionary Should Contain Key ${conversation} transcript | ||
| ${transcript}= Set Variable ${conversation}[transcript] | ||
| Should Not Be Empty ${transcript} | ||
|
|
||
| # Check length | ||
| ${transcript_length}= Get Length ${transcript} | ||
| Should Be True ${transcript_length} >= ${min_length} Transcript too short: ${transcript_length} | ||
|
|
||
| # Check for expected keywords | ||
| ${transcript_lower}= Convert To Lower Case ${transcript} | ||
| FOR ${keyword} IN @{expected_keywords} | ||
| ${keyword_lower}= Convert To Lower Case ${keyword} | ||
| Should Contain ${transcript_lower} ${keyword_lower} Missing keyword: ${keyword} | ||
| END | ||
|
|
||
| # Verify segments exist | ||
| Dictionary Should Contain Key ${conversation} segments | ||
| ${segments}= Set Variable ${conversation}[segments] | ||
| ${segment_count}= Get Length ${segments} | ||
| Should Be True ${segment_count} > 0 No segments found | ||
|
|
||
| Log Transcript verification passed: ${transcript_length} chars, ${segment_count} segments INFO | ||
|
|
||
| Verify Transcript Content | ||
| [Documentation] Verify transcript contains expected content and quality | ||
| [Arguments] ${conversation} ${expected_keywords} ${min_length}=50 | ||
|
|
||
| Dictionary Should Contain Key ${conversation} transcript | ||
| ${transcript}= Set Variable ${conversation}[transcript] | ||
| Should Not Be Empty ${transcript} | ||
|
|
||
| # Check length | ||
| ${transcript_length}= Get Length ${transcript} | ||
| Should Be True ${transcript_length} >= ${min_length} Transcript too short: ${transcript_length} | ||
|
|
||
| # Check for expected keywords | ||
| ${transcript_lower}= Convert To Lower Case ${transcript} | ||
| FOR ${keyword} IN @{expected_keywords} | ||
| ${keyword_lower}= Convert To Lower Case ${keyword} | ||
| Should Contain ${transcript_lower} ${keyword_lower} Missing keyword: ${keyword} | ||
| END | ||
|
|
||
| # Verify segments exist | ||
| Dictionary Should Contain Key ${conversation} segments | ||
| ${segments}= Set Variable ${conversation}[segments] | ||
| ${segment_count}= Get Length ${segments} | ||
| Should Be True ${segment_count} > 0 No segments found | ||
|
|
||
| Log Transcript verification passed: ${transcript_length} chars, ${segment_count} segments INFO | ||
|
|
||
| Verify Transcript Content | ||
| [Documentation] Verify transcript contains expected content and quality | ||
| [Arguments] ${conversation} ${expected_keywords} ${min_length}=50 | ||
|
|
||
| Dictionary Should Contain Key ${conversation} transcript | ||
| ${transcript}= Set Variable ${conversation}[transcript] | ||
| Should Not Be Empty ${transcript} | ||
|
|
||
| # Check length | ||
| ${transcript_length}= Get Length ${transcript} | ||
| Should Be True ${transcript_length} >= ${min_length} Transcript too short: ${transcript_length} | ||
|
|
||
| # Check for expected keywords | ||
| ${transcript_lower}= Convert To Lower Case ${transcript} | ||
| FOR ${keyword} IN @{expected_keywords} | ||
| ${keyword_lower}= Convert To Lower Case ${keyword} | ||
| Should Contain ${transcript_lower} ${keyword_lower} Missing keyword: ${keyword} | ||
| END | ||
|
|
||
| # Verify segments exist | ||
| Dictionary Should Contain Key ${conversation} segments | ||
| ${segments}= Set Variable ${conversation}[segments] | ||
| ${segment_count}= Get Length ${segments} | ||
| Should Be True ${segment_count} > 0 No segments found | ||
|
|
||
| Log Transcript verification passed: ${transcript_length} chars, ${segment_count} segments INFO |
There was a problem hiding this comment.
Duplicate keyword definitions will cause parsing errors.
The keyword Verify Transcript Content is defined three times (lines 257, 284, and 311). Robot Framework does not allow duplicate keyword names in the same file and will fail to parse this.
Additionally, line 257 appears to start a new keyword definition inside Verify Segments Match Expected Timestamps without proper structure separation.
Remove the duplicate definitions and fix the structural issue:
Log All ${actual_count} segments matched expected timestamps within ${tolerance}s tolerance INFO
- Verify Transcript Content
- [Documentation] Verify transcript contains expected content and quality
- [Arguments] ${conversation} ${expected_keywords} ${min_length}=50
- ... (lines 257-282 - REMOVE THIS DUPLICATE)
-
-Verify Transcript Content
- [Documentation] Verify transcript contains expected content and quality
- [Arguments] ${conversation} ${expected_keywords} ${min_length}=50
- ... (lines 284-309 - REMOVE THIS DUPLICATE)
-
- Verify Transcript Content
- [Documentation] Verify transcript contains expected content and quality
- [Arguments] ${conversation} ${expected_keywords} ${min_length}=50
- ... (lines 311-336 - KEEP ONLY ONE DEFINITION)
+
+Verify Transcript Content
+ [Documentation] Verify transcript contains expected content and quality
+ [Arguments] ${conversation} ${expected_keywords} ${min_length}=50
+
+ Dictionary Should Contain Key ${conversation} transcript
+ ${transcript}= Set Variable ${conversation}[transcript]
+ Should Not Be Empty ${transcript}
+
+ # Check length
+ ${transcript_length}= Get Length ${transcript}
+ Should Be True ${transcript_length} >= ${min_length} Transcript too short: ${transcript_length}
+
+ # Check for expected keywords
+ ${transcript_lower}= Convert To Lower Case ${transcript}
+ FOR ${keyword} IN @{expected_keywords}
+ ${keyword_lower}= Convert To Lower Case ${keyword}
+ Should Contain ${transcript_lower} ${keyword_lower} Missing keyword: ${keyword}
+ END
+
+ # Verify segments exist
+ Dictionary Should Contain Key ${conversation} segments
+ ${segments}= Set Variable ${conversation}[segments]
+ ${segment_count}= Get Length ${segments}
+ Should Be True ${segment_count} > 0 No segments found
+
+ Log Transcript verification passed: ${transcript_length} chars, ${segment_count} segments INFOCommittable suggestion skipped: line range outside the PR's diff.
| Get Admin User Details With Token | ||
| [Documentation] Get current admin user details using token (legacy compatibility) | ||
| [Arguments] ${token} | ||
|
|
||
| Create Session temp_user_session ${API_URL} verify=True | ||
| &{headers}= Create Dictionary Authorization=Bearer ${token} | ||
| ${response}= GET On Session temp_user_session /users/me headers=${headers} expected_status=200 | ||
| ${user}= Set Variable ${response.json()} | ||
| Delete All Sessions | ||
| RETURN ${user} |
There was a problem hiding this comment.
Delete All Sessions will destroy sessions needed by other tests.
This keyword deletes ALL RequestsLibrary sessions, not just temp_user_session. If called during a test suite, it will break subsequent API calls that rely on the api session.
Apply this diff to delete only the temporary session:
${user}= Set Variable ${response.json()}
- Delete All Sessions
+ Delete Session temp_user_session
RETURN ${user}🤖 Prompt for AI Agents
In tests/resources/user_keywords.robot around lines 60 to 69, the keyword
currently calls "Delete All Sessions" which removes every RequestsLibrary
session and breaks other tests; change it to delete only the temporary session
created earlier by replacing "Delete All Sessions" with "Delete Session
temp_user_session" (ensuring the session name matches the Create Session call)
so only the temp_user_session is removed and other shared sessions remain
intact.
| cat > setup/.env.test << EOF | ||
| # API URLs | ||
| API_URL=http://localhost:8001 | ||
| BACKEND_URL=http://localhost:8001 | ||
| FRONTEND_URL=http://localhost:3001 | ||
|
|
||
| # Test Admin Credentials | ||
| ADMIN_EMAIL=test-admin@example.com | ||
| ADMIN_PASSWORD=test-admin-password-123 | ||
|
|
||
| # API Keys (from environment) | ||
| OPENAI_API_KEY=${OPENAI_API_KEY} | ||
| DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} | ||
|
|
||
| # Test Configuration | ||
| TEST_TIMEOUT=120 | ||
| TEST_DEVICE_NAME=robot-test | ||
| EOF | ||
| print_success "Created setup/.env.test" |
There was a problem hiding this comment.
Secrets are written to a file that could be committed to version control.
The script writes OPENAI_API_KEY and DEEPGRAM_API_KEY to setup/.env.test. If this file isn't in .gitignore, secrets could be accidentally committed.
Ensure setup/.env.test is in .gitignore:
#!/bin/bash
# Verify .env.test is gitignored
grep -q "\.env\.test" tests/.gitignore 2>/dev/null || grep -q "\.env" tests/.gitignore 2>/dev/nullAlso consider warning the user:
print_success "Created setup/.env.test"
+ print_warning "setup/.env.test contains API keys - ensure it's in .gitignore"
fi🤖 Prompt for AI Agents
In tests/run-robot-tests.sh around lines 77 to 95, the script writes
OPENAI_API_KEY and DEEPGRAM_API_KEY into setup/.env.test which can be
accidentally committed; update the script to ensure setup/.env.test is listed in
.gitignore (preferably tests/.gitignore or repo root .gitignore) before creating
the file by checking for ".env.test" or ".env" and appending the entry if
missing, and additionally warn the user (print a message and require
confirmation) if either API key is missing or when overwriting an existing
setup/.env.test so secrets are not silently written to a tracked file.
…anagement and add verbose console output for better visibility of test runs.
…tes by re-raising HTTPExceptions unchanged. This ensures proper propagation of specific HTTP errors while maintaining logging for other exceptions.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (1)
86-91: Unify exception-handling pattern and improve logging/traceability across job endpointsThe updates to re-raise
HTTPExceptionunchanged inget_job_status,get_job, andcancel_jobare a solid fix—authorization errors are no longer hidden behind a 404.You still have very similar generic
except Exception as e:blocks that:
- Use
logger.error(...)instead oflogger.exception(...), so stack traces are lost.- Raise a fresh
HTTPExceptionwithoutfrom e, so the original cause is not chained (Ruff’s B904/TRY400).Consider tightening this pattern once and applying it across these endpoints. For example, in
get_job_status:- except HTTPException: - # Re-raise HTTPException unchanged (e.g., 403 Forbidden) - raise - except Exception as e: - logger.error(f"Failed to get job status {job_id}: {e}") - raise HTTPException(status_code=404, detail="Job not found") + except HTTPException: + # Re-raise HTTPException unchanged (e.g., 403 Forbidden) + raise + except Exception as e: + logger.exception("Failed to get job status %s", job_id) + raise HTTPException(status_code=404, detail="Job not found") from eand similarly for
get_jobandcancel_job(without changing their chosen status codes).This follows the earlier review guidance and aligns with the Ruff hints.
Also applies to: 137-142, 180-185
🧹 Nitpick comments (3)
tests/Makefile (2)
4-4: Consider adding a standard "test" target.Makefile convention and many CI/tool workflows expect a
testtarget as the standard entry point. Consider addingtestas an alias toallfor compatibility.-.PHONY: help all clean +.PHONY: help all clean test endpoints integration infraThen add:
test: allThis aligns with common build tool conventions and improves discoverability.
22-24: Document the CREATE_FIXTURE environment variable in help.The help output doesn't mention the
CREATE_FIXTUREenvironment variable, which is essential for understanding the behavior of theallandintegrationtargets (lines 34, 50). Add it to the "Environment Variables" section.@echo "Environment Variables:" @echo " OUTPUTDIR - Output directory (default: results)" + @echo " CREATE_FIXTURE - Persist fixtures across test suites (default: false)" @echo ""backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (1)
1099-1099: RaisingHTTPExceptionfrom/dashboarderrors is appropriate; consider chaining the causeSwitching from returning a plain error dict to raising an
HTTPException(status_code=500, ...)whenget_dashboard_datafails is an improvement—clients now get a proper 5xx.To retain the original traceback and satisfy Ruff’s B904 recommendation, you can chain the cause:
- except Exception as e: - logger.error(f"Failed to get dashboard data: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {str(e)}") + except Exception as e: + logger.error("Failed to get dashboard data: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get dashboard data: {e}") from e(Not a blocker, but improves debuggability.)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py(11 hunks)tests/Makefile(1 hunks)tests/run-robot-tests.sh(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/run-robot-tests.sh
🧰 Additional context used
🧬 Code graph analysis (1)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (3)
backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py (3)
get_jobs(96-193)get_job_stats(62-93)get_queue_health(481-531)backends/advanced/src/advanced_omi_backend/models/user.py (2)
User(51-98)user_id(71-73)backends/advanced/src/advanced_omi_backend/task_manager.py (1)
get_task_manager(333-337)
🪛 checkmake (0.2.2)
tests/Makefile
[warning] 10-10: Target body for "help" exceeds allowed length of 5 (18).
(maxbodylength)
[warning] 4-4: Missing required phony target "test"
(minphony)
🪛 Ruff (0.14.7)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
28-28: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
56-56: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
66-66: Abstract raise to an inner function
(TRY301)
81-84: Consider moving this statement to an else block
(TRY300)
89-89: Do not catch blind exception: Exception
(BLE001)
90-90: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
91-91: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
183-183: Do not catch blind exception: Exception
(BLE001)
353-353: Unused function argument: current_user
(ARG001)
353-353: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
377-377: Consider moving this statement to an else block
(TRY300)
379-379: Do not catch blind exception: Exception
(BLE001)
380-380: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
381-381: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
381-381: Use explicit conversion flag
Replace with conversion flag
(RUF010)
649-649: Do not catch blind exception: Exception
(BLE001)
655-655: Do not catch blind exception: Exception
(BLE001)
1099-1099: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
1099-1099: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (4)
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py (4)
12-12: Centralizing queue/Redis primitives viaqueue_controllerimport looks goodImporting
get_jobs,get_job_stats,redis_conn, andQUEUE_NAMESfromadvanced_omi_backend.controllers.queue_controllerkeeps queue configuration and Redis wiring in one place and avoids duplication in the router. No issues here.
503-506: Flush-all request flags and docstring behavior change are clear and conservativeExtending
FlushAllJobsRequestwithinclude_failedandinclude_completed(defaulting toFalse) and documenting that behavior in theflush_all_jobsdocstring is a good safety measure—failed/completed jobs are preserved for debugging unless explicitly requested.The API shape is clear and backward-compatible (existing callers that don’t send the new flags will get the safer behavior by default).
Also applies to: 581-585
712-715: Extendedflush_all_jobsresponse shape looks consistentAdding
deleted_keys,preserved, and a human-readablemessageto theflush_all_jobsresponse is helpful for operators and UIs:
deleted_keystracks how many Redis keys were removed.preservedenumerates which job categories were intentionally kept.messageconcisely summarizes the action, including preserved types.This is a sensible extension and appears internally consistent with the logic above.
615-621: File not found in repository; cannot verify implementation detailsThe file
backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.pyand related paths referenced in this review comment do not exist in the current repository state. The technical concern aboutredis.asyncio.from_url()is valid in principle—the function returns aRedisinstance directly, not an awaitable, so awaiting it would cause a runtimeTypeError. However, without access to the actual code at the specified lines, I cannot confirm whether this issue exists in the codebase, verify the line numbers, or assess the scope of the other related concerns (623-627, 635-639, 641-657, 658-658, 671-715).
| job_type: str = Query(None, description="Filter by job type (matches func_name)"), | ||
| client_id: str = Query(None, description="Filter by client_id in meta"), |
There was a problem hiding this comment.
Per-user filtering in list_jobs is currently broken for non-admins
The new job_type and client_id filters wired through to get_jobs are good, but the subsequent per-user filtering is incorrect:
queue_controller.get_jobs(...)returns each job dict with a top-level"user_id"key (see controller implementation), not a"kwargs"payload.- Line 39 (
job_kwargs = job.get("kwargs", {})) will almost always produce{}, so the check on Line 40 never matches and non-superusers see no jobs at all.
This is a correctness bug for non-admin users.
Consider updating the filter to use the user_id field that get_jobs already exposes, e.g.:
- if not current_user.is_superuser:
- # Filter based on user_id in job kwargs (where RQ stores job parameters)
- user_jobs = []
- for job in result["jobs"]:
- job_kwargs = job.get("kwargs", {})
- if job_kwargs.get("user_id") == str(current_user.user_id):
- user_jobs.append(job)
-
- result["jobs"] = user_jobs
- result["pagination"]["total"] = len(user_jobs)
+ if not current_user.is_superuser:
+ # Filter based on user_id already exposed by get_jobs
+ user_jobs = [
+ job for job in result["jobs"]
+ if job.get("user_id") == str(current_user.user_id)
+ ]
+
+ result["jobs"] = user_jobs
+ result["pagination"]["total"] = len(user_jobs)
+ # Optional: recompute has_more for per-user view, or force False
+ result["pagination"]["has_more"] = FalseThis restores correct behavior for regular users and aligns with the controller’s response shape.
Also applies to: 32-44
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py
around lines 26-27 (and apply same fix to lines 32-44), the per-user filtering
uses job.get("kwargs", {}) which is wrong because queue_controller.get_jobs()
returns a top-level "user_id"; change the filter to read job_user_id =
job.get("user_id") and compare that to the current user's id (or identifier used
on the request) instead of inspecting kwargs so non-admin users see their jobs
correctly.
| @router.get("/worker-details") | ||
| async def get_queue_worker_details( | ||
| current_user: User = Depends(current_active_user) | ||
| ): | ||
| """Get detailed queue and worker status including task manager health.""" | ||
| try: | ||
| health = get_queue_health() | ||
| return health | ||
| from advanced_omi_backend.controllers.queue_controller import get_queue_health | ||
| from advanced_omi_backend.task_manager import get_task_manager | ||
| import time | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to get queue health: {e}") | ||
| return { | ||
| "status": "unhealthy", | ||
| "message": f"Health check failed: {str(e)}" | ||
| # Get queue health directly | ||
| queue_health = get_queue_health() | ||
|
|
||
| status = { | ||
| "architecture": "rq_workers", | ||
| "timestamp": int(time.time()), | ||
| "workers": { | ||
| "total": queue_health.get("total_workers", 0), | ||
| "active": queue_health.get("active_workers", 0), | ||
| "idle": queue_health.get("idle_workers", 0), | ||
| "details": queue_health.get("workers", []) | ||
| }, | ||
| "queues": queue_health.get("queues", {}), | ||
| "redis_connection": queue_health.get("redis_connection", "unknown") | ||
| } | ||
|
|
||
| return status | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to get queue worker details: {e}") | ||
| raise HTTPException(status_code=500, detail=f"Failed to get worker details: {str(e)}") | ||
|
|
There was a problem hiding this comment.
/worker-details endpoint exposes infra details without admin guard
This endpoint returns internal worker and queue topology, including worker names, states, and Redis health. Currently:
current_useris injected but unused.- There is no superuser/admin check, so any authenticated user can see full worker details.
Given how other administrative endpoints in this module are guarded (e.g., /flush, /flush-all, /sessions/clear require is_superuser), this is likely an oversight and a potential information-disclosure issue.
Consider:
@router.get("/worker-details")
async def get_queue_worker_details(
current_user: User = Depends(current_active_user)
):
- """Get detailed queue and worker status including task manager health."""
+ """Get detailed queue and worker status including task manager health."""
+ if not current_user.is_superuser:
+ raise HTTPException(status_code=403, detail="Admin access required")
@@
- from advanced_omi_backend.controllers.queue_controller import get_queue_health
- from advanced_omi_backend.task_manager import get_task_manager
+ from advanced_omi_backend.controllers.queue_controller import get_queue_health
+ from advanced_omi_backend.task_manager import get_task_manager # Use or removeAlso, the docstring mentions “task manager health”, but get_task_manager is imported and never used; either incorporate its health into the response or drop the import/docstring reference.
🧰 Tools
🪛 Ruff (0.14.7)
353-353: Unused function argument: current_user
(ARG001)
353-353: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
377-377: Consider moving this statement to an else block
(TRY300)
379-379: Do not catch blind exception: Exception
(BLE001)
380-380: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
381-381: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
381-381: Use explicit conversion flag
Replace with conversion flag
(RUF010)
| # Friend-Lite Test Makefile | ||
| # Shortcuts for running tests | ||
|
|
||
| .PHONY: help all clean |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Complete the .PHONY declaration to include all non-file targets.
The .PHONY declaration is incomplete and only includes help, all, and clean. Add endpoints, integration, and infra to prevent Make from treating them as file targets if files with those names are created.
-.PHONY: help all clean
+.PHONY: help all clean endpoints integration infra📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .PHONY: help all clean | |
| .PHONY: help all clean endpoints integration infra |
🧰 Tools
🪛 checkmake (0.2.2)
[warning] 4-4: Missing required phony target "test"
(minphony)
🤖 Prompt for AI Agents
In tests/Makefile around line 4, the .PHONY declaration currently lists only
help, all, and clean; update it to include all non-file targets by adding
endpoints, integration, and infra so Make won't treat those target names as
files. Modify the .PHONY line to include the additional targets (endpoints,
integration, infra) alongside the existing ones.
Merged with main
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.