diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 38f1eccb..ce2677b4 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -27,7 +27,25 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - + + - name: Verify required secrets + env: + DEEPGRAM_API_KEY: ${{ secrets.DEEPGRAM_API_KEY }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + run: | + echo "Verifying required secrets..." + if [ -z "$DEEPGRAM_API_KEY" ]; then + echo "❌ ERROR: DEEPGRAM_API_KEY secret is not set" + exit 1 + fi + if [ -z "$OPENAI_API_KEY" ]; then + echo "❌ ERROR: OPENAI_API_KEY secret is not set" + exit 1 + fi + echo "✓ DEEPGRAM_API_KEY is set (length: ${#DEEPGRAM_API_KEY})" + echo "✓ OPENAI_API_KEY is set (length: ${#OPENAI_API_KEY})" + echo "✓ All required secrets verified" + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/.github/workflows/robot-tests.yml b/.github/workflows/robot-tests.yml index 4b12e0ec..a00b7c1c 100644 --- a/.github/workflows/robot-tests.yml +++ b/.github/workflows/robot-tests.yml @@ -24,6 +24,24 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: Verify required secrets + env: + DEEPGRAM_API_KEY: ${{ secrets.DEEPGRAM_API_KEY }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + run: | + echo "Verifying required secrets..." + if [ -z "$DEEPGRAM_API_KEY" ]; then + echo "❌ ERROR: DEEPGRAM_API_KEY secret is not set" + exit 1 + fi + if [ -z "$OPENAI_API_KEY" ]; then + echo "❌ ERROR: OPENAI_API_KEY secret is not set" + exit 1 + fi + echo "✓ DEEPGRAM_API_KEY is set (length: ${#DEEPGRAM_API_KEY})" + echo "✓ OPENAI_API_KEY is set (length: ${#OPENAI_API_KEY})" + echo "✓ All required secrets verified" + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 with: @@ -161,6 +179,26 @@ jobs: echo "✓ All services ready!" + - name: Verify checked out code + working-directory: tests + run: | + echo "Current git commit:" + git log -1 --oneline + echo "" + echo "Test files in current checkout:" + find . -name "*.robot" -type f | head -10 + echo "" + echo "Sample of tags in test files:" + grep -h "\[Tags\]" endpoints/*.robot infrastructure/*.robot integration/*.robot 2>/dev/null | head -20 || echo "No tag files found" + + - name: Clean previous test results + working-directory: tests + run: | + echo "Cleaning any previous test results..." + rm -rf results + mkdir -p results + echo "✓ Fresh results directory created" + - name: Run Robot Framework tests working-directory: tests env: @@ -170,8 +208,11 @@ jobs: OPENAI_MODEL: gpt-4o-mini DEEPGRAM_API_KEY: ${{ secrets.DEEPGRAM_API_KEY }} run: | - # Run all tests - make all OUTPUTDIR=results || true + # Run all tests (don't fail workflow to allow artifact upload) + make all OUTPUTDIR=results + TEST_EXIT_CODE=$? + echo "test_exit_code=$TEST_EXIT_CODE" >> $GITHUB_ENV + exit 0 # Don't fail here, we'll fail at the end after uploading artifacts - name: Show service logs if: always() @@ -343,3 +384,13 @@ jobs: working-directory: backends/advanced run: | docker compose -f docker-compose-test.yml down -v + + - name: Fail workflow if tests failed + if: always() + run: | + if [ "${{ env.test_exit_code }}" != "0" ]; then + echo "❌ Tests failed with exit code ${{ env.test_exit_code }}" + exit 1 + else + echo "✅ All tests passed" + fi diff --git a/.github/workflows/speaker-recognition-tests.yml b/.github/workflows/speaker-recognition-tests.yml index f7342848..5768ada7 100644 --- a/.github/workflows/speaker-recognition-tests.yml +++ b/.github/workflows/speaker-recognition-tests.yml @@ -32,7 +32,25 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - + + - name: Verify required secrets + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + DEEPGRAM_API_KEY: ${{ secrets.DEEPGRAM_API_KEY }} + run: | + echo "Verifying required secrets..." + if [ -z "$HF_TOKEN" ]; then + echo "❌ ERROR: HF_TOKEN secret is not set" + exit 1 + fi + if [ -z "$DEEPGRAM_API_KEY" ]; then + echo "❌ ERROR: DEEPGRAM_API_KEY secret is not set" + exit 1 + fi + echo "✓ HF_TOKEN is set (length: ${#HF_TOKEN})" + echo "✓ DEEPGRAM_API_KEY is set (length: ${#DEEPGRAM_API_KEY})" + echo "✓ All required secrets verified" + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/.gitignore b/.gitignore index 62f4cf39..b2b052b3 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,9 @@ extras/openmemory-mcp/data/* backends/advanced/nginx.conf backends/advanced/Caddyfile +app/ios/Pods +results +log.html +output.xml +report.html +.secrets diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d699ff5..6ebb6573 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,26 @@ repos: + # Local hooks (project-specific checks) + - repo: local + hooks: + # Run Robot Framework endpoint tests before push + - id: robot-framework-tests + name: Robot Framework Tests (Endpoints) + entry: bash -c 'cd tests && make endpoints OUTPUTDIR=.pre-commit-results' + language: system + pass_filenames: false + stages: [push] + verbose: true + + # Clean up test results after hook runs + - id: cleanup-test-results + name: Cleanup Test Results + entry: bash -c 'cd tests && rm -rf .pre-commit-results' + language: system + pass_filenames: false + stages: [push] + always_run: true + + # Code formatting - repo: https://github.com/psf/black rev: 24.4.2 hooks: @@ -9,6 +31,8 @@ repos: hooks: - id: isort files: ^backends/advanced-backend/src/.*\.py$ + + # File hygiene - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: diff --git a/CLAUDE.md b/CLAUDE.md index 1efb4a2e..0f579d33 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,9 +38,6 @@ uv run pytest tests/test_memory_service.py # Single test file # Environment setup cp .env.template .env # Configure environment variables -# Setup test environment (optional, for running integration tests) -uv run --with-requirements setup-requirements.txt python setup_test_env.py # Creates .env.test - # Reset data (development) sudo rm -rf backends/advanced/data/ ``` @@ -71,10 +68,6 @@ cd backends/advanced # Requires .env file with DEEPGRAM_API_KEY and OPENAI_API_KEY cp .env.template .env # Configure API keys -# Optional: Setup test environment with test-specific credentials -# (wizard.py prompts for this, or run manually) -uv run --with-requirements setup-requirements.txt python setup_test_env.py - # Run full integration test suite ./run-test.sh @@ -408,6 +401,31 @@ For detailed technical documentation, see: - **[@docs/speaker-recognition.md](docs/speaker-recognition.md)**: Advanced analysis and live inference features - **[@docs/distributed-deployment.md](docs/distributed-deployment.md)**: Multi-machine deployment with Tailscale +## Robot Framework Testing + +**IMPORTANT: When writing or modifying Robot Framework tests, you MUST follow the testing guidelines.** + +Before writing any Robot Framework test: +1. **Read [@tests/TESTING_GUIDELINES.md](tests/TESTING_GUIDELINES.md)** for comprehensive testing patterns and standards +2. **Check [@tests/tags.md](tests/tags.md)** for approved tags - ONLY 11 tags are permitted +3. **SCAN existing resource files** for keywords - NEVER write code that duplicates existing keywords +4. **Follow the Arrange-Act-Assert pattern** with inline verifications (not abstracted to keywords) + +Key Testing Rules: +- **Check Existing Keywords FIRST**: Before writing ANY test code, scan relevant resource files (`websocket_keywords.robot`, `queue_keywords.robot`, `conversation_keywords.robot`, etc.) for existing keywords +- **Tags**: ONLY use the 11 approved tags from tags.md, tab-separated (e.g., `[Tags] infra audio-streaming`) +- **Verifications**: Write assertions directly in tests, not in resource keywords +- **Keywords**: Only create keywords for reusable setup/action operations AFTER confirming no existing keyword exists +- **Resources**: Always check existing resource files before creating new keywords or duplicating logic +- **Naming**: Use descriptive names that explain business purpose, not technical implementation + +**DO NOT:** +- Write inline code without checking if a keyword already exists for that operation +- Create custom tags (use only the 11 approved tags) +- Abstract verifications into keywords (keep them inline in tests) +- Use space-separated tags (must be tab-separated) +- Skip reading the guidelines before writing tests + ## Notes for Claude Check if the src/ is volume mounted. If not, do compose build so that code changes are reflected. Do not simply run `docker compose restart` as it will not rebuild the image. Check backends/advanced/Docs for up to date information on advanced backend. diff --git a/Makefile b/Makefile index 4f470f94..1a5a3829 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,7 @@ menu: ## Show interactive menu (default) @echo "================================" @echo @echo "📋 Quick Actions:" + @echo " setup-dev 🛠️ Setup development environment (git hooks, pre-commit)" @echo " setup-k8s 🏗️ Complete Kubernetes setup (registry + infrastructure + RBAC)" @echo " config 📝 Generate all configuration files" @echo " deploy 🚀 Deploy using configured mode ($(DEPLOYMENT_MODE))" @@ -114,6 +115,29 @@ help: ## Show detailed help for all targets @echo "🧹 CLEANUP:" @echo " clean Clean up generated configuration files" +# ======================================== +# DEVELOPMENT SETUP +# ======================================== + +setup-dev: ## Setup development environment (git hooks, pre-commit) + @echo "🛠️ Setting up development environment..." + @echo "" + @echo "📦 Installing pre-commit..." + @pip install pre-commit 2>/dev/null || pip3 install pre-commit + @echo "" + @echo "🔧 Installing git hooks..." + @pre-commit install --hook-type pre-push + @pre-commit install --hook-type pre-commit + @echo "" + @echo "✅ Development environment setup complete!" + @echo "" + @echo "💡 Hooks installed:" + @echo " • Robot Framework tests run before push" + @echo " • Black/isort format Python code on commit" + @echo " • Code quality checks on commit" + @echo "" + @echo "⚙️ To skip hooks: git push --no-verify / git commit --no-verify" + # ======================================== # KUBERNETES SETUP # ======================================== diff --git a/backends/advanced/.dockerignore b/backends/advanced/.dockerignore index 38c6284e..2dd9b44f 100644 --- a/backends/advanced/.dockerignore +++ b/backends/advanced/.dockerignore @@ -16,5 +16,6 @@ !nginx.conf !nginx.conf.template !start.sh +!start-k8s.sh !start-workers.sh !Caddyfile \ No newline at end of file diff --git a/backends/advanced/docker-compose-ci.yml b/backends/advanced/docker-compose-ci.yml new file mode 100644 index 00000000..79d7a2b0 --- /dev/null +++ b/backends/advanced/docker-compose-ci.yml @@ -0,0 +1,192 @@ +# docker-compose-ci.yml +# CI/CD environment for GitHub Actions +# Uses built image without source code mounts to ensure memory_config.yaml is included + +services: + friend-backend-test: + build: + context: . + dockerfile: Dockerfile + ports: + - "8001:8000" # Avoid conflict with dev on 8000 + volumes: + # No src mount for CI - use built image with all files included + - ./data/test_audio_chunks:/app/audio_chunks + - ./data/test_debug_dir:/app/debug_dir + - ./data/test_data:/app/data + environment: + # Override with test-specific settings + - MONGODB_URI=mongodb://mongo-test:27017/test_db + - QDRANT_BASE_URL=qdrant-test + - QDRANT_PORT=6333 + - REDIS_URL=redis://redis-test:6379/0 + - DEBUG_DIR=/app/debug_dir + # Import API keys from environment + - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} + - OPENAI_API_KEY=${OPENAI_API_KEY} + # LLM provider configuration (required for memory service) + - LLM_PROVIDER=${LLM_PROVIDER:-openai} + - OPENAI_BASE_URL=${OPENAI_BASE_URL:-https://api.openai.com/v1} + - OPENAI_MODEL=${OPENAI_MODEL:-gpt-4o-mini} + # Authentication (test-specific) + - AUTH_SECRET_KEY=test-jwt-signing-key-for-integration-tests + - ADMIN_PASSWORD=test-admin-password-123 + - ADMIN_EMAIL=test-admin@example.com + # Transcription provider configuration + - TRANSCRIPTION_PROVIDER=${TRANSCRIPTION_PROVIDER:-deepgram} + # - PARAKEET_ASR_URL=${PARAKEET_ASR_URL} + # Memory provider configuration + - MEMORY_PROVIDER=${MEMORY_PROVIDER:-friend_lite} + - OPENMEMORY_MCP_URL=${OPENMEMORY_MCP_URL:-http://host.docker.internal:8765} + - OPENMEMORY_USER_ID=${OPENMEMORY_USER_ID:-openmemory} + # Disable speaker recognition in test environment to prevent segment duplication + - DISABLE_SPEAKER_RECOGNITION=false + - SPEAKER_SERVICE_URL=https://localhost:8085 + - CORS_ORIGINS=http://localhost:3001,http://localhost:8001,https://localhost:3001,https://localhost:8001 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true + depends_on: + qdrant-test: + condition: service_started + mongo-test: + condition: service_healthy + redis-test: + condition: service_started + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/readiness"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + restart: unless-stopped + + webui-test: + build: + context: ./webui + dockerfile: Dockerfile + args: + - VITE_BACKEND_URL=http://localhost:8001 + - BACKEND_URL=http://localhost:8001 + volumes: + - ./webui/src:/app/src # Mount source code for easier development + ports: + - "3001:80" # Avoid conflict with dev on 3000 + depends_on: + friend-backend-test: + condition: service_healthy + mongo-test: + condition: service_healthy + qdrant-test: + condition: service_started + redis-test: + condition: service_started + + qdrant-test: + image: qdrant/qdrant:latest + ports: + - "6337:6333" # gRPC - avoid conflict with dev 6333 + - "6338:6334" # HTTP - avoid conflict with dev 6334 + volumes: + - ./data/test_qdrant_data:/qdrant/storage + + mongo-test: + image: mongo:8.0.14 + ports: + - "27018:27017" # Avoid conflict with dev on 27017 + volumes: + - ./data/test_mongo_data:/data/db + # Use test database name to ensure isolation + command: mongod --dbpath /data/db --bind_ip_all + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.runCommand('ping').ok", "--quiet"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + + redis-test: + image: redis:7-alpine + ports: + - "6380:6379" # Avoid conflict with dev on 6379 + volumes: + - ./data/test_redis_data:/data + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + workers-test: + build: + context: . + dockerfile: Dockerfile + command: ./start-workers.sh + volumes: + # No src mount for CI - use built image + - ./data/test_audio_chunks:/app/audio_chunks + - ./data/test_debug_dir:/app/debug_dir + - ./data/test_data:/app/data + environment: + # Same environment as backend + - MONGODB_URI=mongodb://mongo-test:27017/test_db + - QDRANT_BASE_URL=qdrant-test + - QDRANT_PORT=6333 + - REDIS_URL=redis://redis-test:6379/0 + - DEBUG_DIR=/app/debug_dir + - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - LLM_PROVIDER=${LLM_PROVIDER:-openai} + - OPENAI_BASE_URL=${OPENAI_BASE_URL:-https://api.openai.com/v1} + - OPENAI_MODEL=${OPENAI_MODEL:-gpt-4o-mini} + - AUTH_SECRET_KEY=test-jwt-signing-key-for-integration-tests + - ADMIN_PASSWORD=test-admin-password-123 + - ADMIN_EMAIL=test-admin@example.com + - TRANSCRIPTION_PROVIDER=${TRANSCRIPTION_PROVIDER:-deepgram} + - MEMORY_PROVIDER=${MEMORY_PROVIDER:-friend_lite} + - OPENMEMORY_MCP_URL=${OPENMEMORY_MCP_URL:-http://host.docker.internal:8765} + - OPENMEMORY_USER_ID=${OPENMEMORY_USER_ID:-openmemory} + - DISABLE_SPEAKER_RECOGNITION=false + - SPEAKER_SERVICE_URL=https://localhost:8085 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true + depends_on: + friend-backend-test: + condition: service_healthy + mongo-test: + condition: service_healthy + redis-test: + condition: service_started + qdrant-test: + condition: service_started + restart: unless-stopped + + # caddy: + # image: caddy:2-alpine + # ports: + # - "443:443" + # - "80:80" # HTTP redirect to HTTPS + # volumes: + # - ./Caddyfile-test:/etc/caddy/Caddyfile:ro + # - ./data/caddy_data:/data + # - ./data/caddy_config:/config + # depends_on: + # webui-test: + # condition: service_started + # friend-backend-test: + # condition: service_healthy + # restart: unless-stopped + +# CI Considerations (for future implementation): +# - GitHub Actions can run these services in isolated containers +# - Port conflicts won't exist in CI since each job runs in isolation +# - For CI, we could add: +# - --build flag for fresh builds +# - --force-recreate for clean state +# - Volume cleanup between test runs +# - Environment variables can be injected via GitHub secrets +# - Health checks ensure services are ready before tests run \ No newline at end of file diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 7f2bb942..029d0238 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -42,6 +42,10 @@ services: - DISABLE_SPEAKER_RECOGNITION=false - SPEAKER_SERVICE_URL=https://localhost:8085 - CORS_ORIGINS=http://localhost:3001,http://localhost:8001,https://localhost:3001,https://localhost:8001 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true depends_on: qdrant-test: condition: service_started @@ -144,6 +148,10 @@ services: - OPENMEMORY_USER_ID=${OPENMEMORY_USER_ID:-openmemory} - DISABLE_SPEAKER_RECOGNITION=false - SPEAKER_SERVICE_URL=https://localhost:8085 + # Set low inactivity timeout for tests (2 seconds instead of 60) + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Wait for audio queue to drain before timing out (test mode) + - WAIT_FOR_AUDIO_QUEUE_DRAIN=true depends_on: friend-backend-test: condition: service_healthy diff --git a/backends/advanced/pyproject.toml b/backends/advanced/pyproject.toml index 5f635cbb..5af2ec2e 100644 --- a/backends/advanced/pyproject.toml +++ b/backends/advanced/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.12" dependencies = [ "easy-audio-interfaces>=0.7.1", # we need to add local-audio for scripts/local-audio.py | If we don't need that, we can remove this, and then remove portaudio19-dev from Dockerfile "fastapi>=0.115.12", + "fastmcp>=0.5.0", # MCP server for conversation access "mem0ai", # Using main branch with PR #3250 AsyncMemory fix "langchain_neo4j", "motor>=3.7.1", @@ -24,6 +25,7 @@ dependencies = [ "redis>=5.0.0", "rq>=1.16.0", "soundfile>=0.12.1", + "websockets>=12.0", ] [project.optional-dependencies] diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index 4d879301..52a48093 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -73,10 +73,6 @@ async def lifespan(app: FastAPI): application_logger.error(f"Failed to create admin user: {e}") # Don't raise here as this is not critical for startup - # Initialize task manager - task_manager = init_task_manager() - await task_manager.start() - application_logger.info("Task manager started") # Initialize Redis connection for RQ try: @@ -156,11 +152,6 @@ async def lifespan(app: FastAPI): except Exception as e: application_logger.error(f"Error closing Redis audio streaming client: {e}") - # Shutdown task manager - task_manager = get_task_manager() - await task_manager.shutdown() - application_logger.info("Task manager shut down") - # Stop metrics collection and save final report application_logger.info("Metrics collection stopped") diff --git a/backends/advanced/src/advanced_omi_backend/auth.py b/backends/advanced/src/advanced_omi_backend/auth.py index e47a3b9e..a39637f1 100644 --- a/backends/advanced/src/advanced_omi_backend/auth.py +++ b/backends/advanced/src/advanced_omi_backend/auth.py @@ -119,9 +119,38 @@ def get_jwt_strategy() -> JWTStrategy: # User dependencies for protecting endpoints current_active_user = fastapi_users.current_user(active=True) +current_active_user_optional = fastapi_users.current_user(active=True, optional=True) current_superuser = fastapi_users.current_user(active=True, superuser=True) +async def get_user_from_token_param(token: str) -> Optional[User]: + """ + Get user from JWT token string (for query parameter authentication). + + This is useful for endpoints that need to support token-based auth via query params, + such as HTML audio elements that can't set custom headers. + + Args: + token: JWT token string + + Returns: + User object if token is valid and user is active, None otherwise + """ + if not token: + return None + try: + strategy = get_jwt_strategy() + user_db_gen = get_user_db() + user_db = await user_db_gen.__anext__() + user_manager = UserManager(user_db) + user = await strategy.read_token(token, user_manager) + if user and user.is_active: + return user + except Exception: + pass + return None + + def get_accessible_user_ids(user: User) -> list[str] | None: """ Get list of user IDs that the current user can access data for. diff --git a/backends/advanced/src/advanced_omi_backend/client.py b/backends/advanced/src/advanced_omi_backend/client.py index 0cf6a1e2..be92716e 100644 --- a/backends/advanced/src/advanced_omi_backend/client.py +++ b/backends/advanced/src/advanced_omi_backend/client.py @@ -169,10 +169,6 @@ async def disconnect(self): # Close current conversation await self.close_current_conversation() - # Cancel any tasks for this client - task_manager = get_task_manager() - await task_manager.cancel_tasks_for_client(self.client_id) - # Clean up state self.speech_segments.clear() self.current_speech_start.clear() diff --git a/backends/advanced/src/advanced_omi_backend/clients/__init__.py b/backends/advanced/src/advanced_omi_backend/clients/__init__.py new file mode 100644 index 00000000..099f3c45 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/clients/__init__.py @@ -0,0 +1,11 @@ +"""Client implementations for Friend-Lite backend. + +This module provides reusable client implementations that can be used for: +- Integration testing +- CLI tools +- External integrations +""" + +from advanced_omi_backend.clients.audio_stream_client import AudioStreamClient + +__all__ = ["AudioStreamClient"] diff --git a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py new file mode 100644 index 00000000..af89fd51 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py @@ -0,0 +1,556 @@ +"""WebSocket client for audio streaming using Wyoming protocol. + +This client mirrors the protocol implementation in websocket_controller.py +and can be used for integration testing and external integrations. + +Protocol flow: +1. Connect to WebSocket with token and device_name +2. Receive "ready" message from server (PCM endpoint only) +3. Send "audio-start" with format and mode +4. Send audio chunks (Wyoming protocol or raw binary) +5. Send "audio-stop" to finalize session + +Example usage (blocking): + ```python + import asyncio + from advanced_omi_backend.clients import AudioStreamClient + + async def main(): + client = AudioStreamClient("http://localhost:8000", "your-jwt-token") + await client.connect() + await client.stream_wav_file("/path/to/audio.wav") + await client.close() + + asyncio.run(main()) + ``` + +Example usage (non-blocking for testing): + ```python + from advanced_omi_backend.clients.audio_stream_client import StreamManager + + manager = StreamManager() + stream_id = manager.start_stream("http://localhost:8000", "token", "device") + manager.send_chunks_from_file(stream_id, "/path/to/audio.wav", num_chunks=10) + # ... do other things while stream is open ... + manager.stop_stream(stream_id) + ``` +""" + +import asyncio +import json +import logging +import threading +import uuid +import wave +from pathlib import Path +from typing import Dict, Optional, Union + +import websockets +from websockets.client import WebSocketClientProtocol + +from advanced_omi_backend.constants import OMI_CHANNELS, OMI_SAMPLE_RATE, OMI_SAMPLE_WIDTH + +logger = logging.getLogger(__name__) + + +class AudioStreamClient: + """WebSocket client for streaming audio using Wyoming protocol. + + This client implements the same protocol as the server expects in + websocket_controller.py, ensuring consistency between client and server. + """ + + def __init__( + self, + base_url: str, + token: str, + device_name: str = "python-client", + endpoint: str = "ws_pcm", + ): + """Initialize the audio stream client. + + Args: + base_url: Base URL of the backend (e.g., "http://localhost:8000") + token: JWT authentication token + device_name: Device name for client identification + endpoint: WebSocket endpoint ("ws_pcm" or "ws_omi") + """ + self.base_url = base_url + self.token = token + self.device_name = device_name + self.endpoint = endpoint + self.ws: Optional[WebSocketClientProtocol] = None + self.chunk_count = 0 + self.total_bytes = 0 + + @property + def ws_url(self) -> str: + """Build WebSocket URL from base URL.""" + url = self.base_url.replace("http://", "ws://").replace("https://", "wss://") + return f"{url}/{self.endpoint}?token={self.token}&device_name={self.device_name}" + + async def connect(self, wait_for_ready: bool = True) -> WebSocketClientProtocol: + """Connect to the WebSocket endpoint. + + Args: + wait_for_ready: If True, wait for "ready" message from server (PCM endpoint) + + Returns: + The WebSocket connection + + Raises: + RuntimeError: If connection fails or ready message not received + """ + logger.info(f"Connecting to {self.ws_url}") + self.ws = await websockets.connect(self.ws_url) + logger.info("WebSocket connected") + + if wait_for_ready and self.endpoint == "ws_pcm": + # PCM endpoint sends "ready" message after auth (line 261-268 in websocket_controller.py) + ready_msg = await self.ws.recv() + ready = json.loads(ready_msg.strip() if isinstance(ready_msg, str) else ready_msg.decode().strip()) + if ready.get("type") != "ready": + raise RuntimeError(f"Expected 'ready' message, got: {ready}") + logger.info("Received ready message from server") + + return self.ws + + async def send_audio_start( + self, + recording_mode: str = "streaming", + sample_rate: int = OMI_SAMPLE_RATE, + sample_width: int = OMI_SAMPLE_WIDTH, + channels: int = OMI_CHANNELS, + ) -> None: + """Send Wyoming audio-start event. + + Args: + recording_mode: "streaming" or "batch" + sample_rate: Audio sample rate in Hz (default: 16000) + sample_width: Bytes per sample (default: 2 for 16-bit) + channels: Number of audio channels (default: 1) + + Note: + The mode is inside the "data" dict, matching _handle_audio_session_start + in websocket_controller.py (line 618). + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = { + "type": "audio-start", + "data": { + "rate": sample_rate, + "width": sample_width, + "channels": channels, + "mode": recording_mode, + }, + "payload_length": None, + } + await self.ws.send(json.dumps(header) + "\n") + logger.info(f"Sent audio-start with mode={recording_mode}") + + async def send_audio_chunk_wyoming( + self, + audio_data: bytes, + sample_rate: int = OMI_SAMPLE_RATE, + sample_width: int = OMI_SAMPLE_WIDTH, + channels: int = OMI_CHANNELS, + ) -> None: + """Send audio chunk using Wyoming protocol (JSON header + binary payload). + + This matches the handler at lines 979-1007 in websocket_controller.py. + + Args: + audio_data: Raw PCM audio bytes + sample_rate: Audio sample rate in Hz + sample_width: Bytes per sample + channels: Number of audio channels + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = { + "type": "audio-chunk", + "payload_length": len(audio_data), + "data": { + "rate": sample_rate, + "width": sample_width, + "channels": channels, + }, + } + # Send JSON header followed by binary payload + await self.ws.send(json.dumps(header) + "\n") + await self.ws.send(audio_data) + + self.chunk_count += 1 + self.total_bytes += len(audio_data) + + if self.chunk_count <= 3 or self.chunk_count % 100 == 0: + logger.debug(f"Sent audio chunk #{self.chunk_count}: {len(audio_data)} bytes") + + async def send_audio_chunk_raw(self, audio_data: bytes) -> None: + """Send raw binary audio without Wyoming header (legacy mode). + + This matches the handler at lines 1016-1035 in websocket_controller.py. + + Args: + audio_data: Raw PCM audio bytes + """ + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + await self.ws.send(audio_data) + + self.chunk_count += 1 + self.total_bytes += len(audio_data) + + async def send_audio_stop(self) -> None: + """Send Wyoming audio-stop event to finalize the session.""" + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = {"type": "audio-stop"} + await self.ws.send(json.dumps(header) + "\n") + logger.info(f"Sent audio-stop (total: {self.chunk_count} chunks, {self.total_bytes} bytes)") + + async def send_ping(self) -> None: + """Send keepalive ping.""" + if not self.ws: + raise RuntimeError("Not connected. Call connect() first.") + + header = {"type": "ping", "payload_length": None} + await self.ws.send(json.dumps(header) + "\n") + logger.debug("Sent ping") + + async def stream_wav_file( + self, + wav_path: Union[str, Path], + chunk_duration_ms: int = 100, + use_wyoming: bool = True, + recording_mode: str = "streaming", + realtime_factor: float = 0.1, + ) -> int: + """Stream a WAV file in chunks, simulating real-time audio. + + Args: + wav_path: Path to the WAV file + chunk_duration_ms: Duration of each chunk in milliseconds + use_wyoming: If True, use Wyoming protocol; if False, send raw binary + recording_mode: "streaming" or "batch" + realtime_factor: Fraction of real-time to simulate (0.1 = 10x speed) + + Returns: + Number of chunks sent + """ + wav_path = Path(wav_path) + if not wav_path.exists(): + raise FileNotFoundError(f"WAV file not found: {wav_path}") + + with wave.open(str(wav_path), "rb") as wav: + sample_rate = wav.getframerate() + channels = wav.getnchannels() + sample_width = wav.getsampwidth() + + logger.info( + f"Streaming {wav_path.name}: {sample_rate}Hz, {channels}ch, {sample_width * 8}-bit" + ) + + # Calculate chunk size + bytes_per_sample = sample_width * channels + samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) + + # Send audio-start + await self.send_audio_start( + recording_mode=recording_mode, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + + # Reset counters + self.chunk_count = 0 + self.total_bytes = 0 + + # Stream chunks + while True: + chunk = wav.readframes(samples_per_chunk) + if not chunk: + break + + if use_wyoming: + await self.send_audio_chunk_wyoming( + chunk, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + else: + await self.send_audio_chunk_raw(chunk) + + # Simulate real-time delay + if realtime_factor > 0: + await asyncio.sleep(chunk_duration_ms / 1000 * realtime_factor) + + # Send audio-stop + await self.send_audio_stop() + + logger.info(f"Finished streaming: {self.chunk_count} chunks, {self.total_bytes} bytes") + return self.chunk_count + + async def close(self) -> None: + """Close the WebSocket connection.""" + if self.ws: + await self.ws.close() + self.ws = None + logger.info("WebSocket connection closed") + + async def __aenter__(self) -> "AudioStreamClient": + """Async context manager entry.""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + """Async context manager exit.""" + await self.close() + + +# Synchronous wrapper for Robot Framework and other sync contexts +def stream_audio_file( + base_url: str, + token: str, + wav_path: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", + use_wyoming: bool = True, +) -> int: + """Synchronous wrapper for streaming audio file. + + This function is designed for use with Robot Framework or other + synchronous test frameworks. + + Args: + base_url: Base URL of the backend + token: JWT authentication token + wav_path: Path to WAV file + device_name: Device name for client identification + recording_mode: "streaming" or "batch" + use_wyoming: If True, use Wyoming protocol + + Returns: + Number of chunks sent + """ + + async def _run() -> int: + async with AudioStreamClient(base_url, token, device_name) as client: + return await client.stream_wav_file( + wav_path, + use_wyoming=use_wyoming, + recording_mode=recording_mode, + ) + + return asyncio.run(_run()) + + +class StreamSession: + """Holds state for an active streaming session.""" + + def __init__( + self, + stream_id: str, + client: AudioStreamClient, + loop: asyncio.AbstractEventLoop, + thread: threading.Thread, + ): + self.stream_id = stream_id + self.client = client + self.loop = loop + self.thread = thread + self.connected = False + self.audio_started = False + self.chunk_count = 0 + self.error: Optional[str] = None + + +class StreamManager: + """Manages multiple non-blocking audio streams for testing. + + This allows tests to start a stream, perform checks while streaming, + and then stop the stream - mimicking real client behavior. + + Example: + manager = StreamManager() + stream_id = manager.start_stream(base_url, token, "test-device") + manager.send_chunks_from_file(stream_id, "audio.wav", num_chunks=10) + # ... check jobs, verify state ... + manager.stop_stream(stream_id) + """ + + def __init__(self): + self._sessions: Dict[str, StreamSession] = {} + + def start_stream( + self, + base_url: str, + token: str, + device_name: str = "robot-test", + recording_mode: str = "streaming", + ) -> str: + """Start a new audio stream (non-blocking). + + Args: + base_url: Backend URL + token: JWT token + device_name: Device name for client ID + recording_mode: "streaming" or "batch" + + Returns: + stream_id: Unique ID for this stream session + """ + stream_id = str(uuid.uuid4())[:8] + + # Create event loop for this stream's thread + loop = asyncio.new_event_loop() + + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + thread = threading.Thread(target=run_loop, daemon=True) + thread.start() + + # Create client + client = AudioStreamClient(base_url, token, device_name) + + session = StreamSession(stream_id, client, loop, thread) + self._sessions[stream_id] = session + + # Connect and send audio-start + async def _connect_and_start(): + try: + await client.connect() + session.connected = True + await client.send_audio_start(recording_mode=recording_mode) + session.audio_started = True + logger.info(f"Stream {stream_id} started for {device_name}") + except Exception as e: + session.error = str(e) + logger.error(f"Stream {stream_id} failed to start: {e}") + + future = asyncio.run_coroutine_threadsafe(_connect_and_start(), loop) + future.result(timeout=10) # Wait for connection + + if session.error: + raise RuntimeError(f"Failed to start stream: {session.error}") + + return stream_id + + def send_chunks_from_file( + self, + stream_id: str, + wav_path: str, + num_chunks: Optional[int] = None, + chunk_duration_ms: int = 100, + realtime_pacing: bool = False, + ) -> int: + """Send audio chunks from a WAV file. + + Args: + stream_id: Stream session ID + wav_path: Path to WAV file + num_chunks: Number of chunks to send (None = all) + chunk_duration_ms: Duration per chunk in ms + realtime_pacing: If True, sleep between chunks to simulate real-time streaming + + Returns: + Number of chunks sent + """ + session = self._sessions.get(stream_id) + if not session: + raise ValueError(f"Unknown stream_id: {stream_id}") + + if not session.audio_started: + raise RuntimeError("Stream not started") + + wav_path = Path(wav_path) + if not wav_path.exists(): + raise FileNotFoundError(f"WAV file not found: {wav_path}") + + async def _send_chunks() -> int: + with wave.open(str(wav_path), "rb") as wav: + sample_rate = wav.getframerate() + channels = wav.getnchannels() + sample_width = wav.getsampwidth() + + samples_per_chunk = int(sample_rate * chunk_duration_ms / 1000) + chunks_sent = 0 + chunk_duration_seconds = chunk_duration_ms / 1000.0 + + while True: + if num_chunks is not None and chunks_sent >= num_chunks: + break + + chunk = wav.readframes(samples_per_chunk) + if not chunk: + break + + await session.client.send_audio_chunk_wyoming( + chunk, + sample_rate=sample_rate, + sample_width=sample_width, + channels=channels, + ) + chunks_sent += 1 + session.chunk_count += 1 + + # Optional: Sleep to maintain real-time pacing + if realtime_pacing: + await asyncio.sleep(chunk_duration_seconds) + + return chunks_sent + + future = asyncio.run_coroutine_threadsafe(_send_chunks(), session.loop) + return future.result(timeout=60) + + def stop_stream(self, stream_id: str) -> int: + """Stop a stream and close the connection. + + Args: + stream_id: Stream session ID + + Returns: + Total chunks sent during this session + """ + session = self._sessions.get(stream_id) + if not session: + raise ValueError(f"Unknown stream_id: {stream_id}") + + async def _stop(): + if session.audio_started: + await session.client.send_audio_stop() + await session.client.close() + + future = asyncio.run_coroutine_threadsafe(_stop(), session.loop) + future.result(timeout=10) + + # Stop the event loop + session.loop.call_soon_threadsafe(session.loop.stop) + session.thread.join(timeout=5) + + total_chunks = session.chunk_count + del self._sessions[stream_id] + + logger.info(f"Stream {stream_id} stopped, sent {total_chunks} chunks") + return total_chunks + + def get_session(self, stream_id: str) -> Optional[StreamSession]: + """Get session info for a stream.""" + return self._sessions.get(stream_id) + + def cleanup_all(self): + """Stop all active streams.""" + for stream_id in list(self._sessions.keys()): + try: + self.stop_stream(stream_id) + except Exception as e: + logger.warning(f"Error stopping stream {stream_id}: {e}") diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 4ca72ca0..da884eb6 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -39,6 +39,7 @@ async def upload_and_process_audio_files( files: list[UploadFile], device_name: str = "upload", auto_generate_client: bool = True, + folder: str = None, ) -> dict: """ Upload audio files and process them directly. @@ -47,6 +48,13 @@ async def upload_and_process_audio_files( 1. Validate and read WAV file 2. Write audio file and create AudioSession immediately 3. Enqueue transcription job (same as WebSocket path) + + Args: + user: Authenticated user + files: List of uploaded audio files + device_name: Device identifier + auto_generate_client: Whether to auto-generate client ID + folder: Optional subfolder for audio storage (e.g., 'fixtures') """ try: if not files: @@ -77,15 +85,24 @@ async def upload_and_process_audio_files( audio_uuid = str(uuid.uuid4()) timestamp = int(time.time() * 1000) + # Determine output directory (with optional subfolder) + from advanced_omi_backend.config import CHUNK_DIR + if folder: + chunk_dir = CHUNK_DIR / folder + chunk_dir.mkdir(parents=True, exist_ok=True) + else: + chunk_dir = CHUNK_DIR + # Validate, write audio file and create AudioSession (all in one) try: - wav_filename, file_path, duration = await write_audio_file( + relative_audio_path, file_path, duration = await write_audio_file( raw_audio_data=content, audio_uuid=audio_uuid, client_id=client_id, user_id=user.user_id, user_email=user.email, timestamp=timestamp, + chunk_dir=chunk_dir, validate=True # Validate WAV format, convert stereo→mono ) except AudioValidationError as e: @@ -97,7 +114,7 @@ async def upload_and_process_audio_files( continue audio_logger.info( - f"📊 {file.filename}: {duration:.1f}s → {wav_filename}" + f"📊 {file.filename}: {duration:.1f}s → {relative_audio_path}" ) # Create conversation immediately for uploaded files (conversation_id auto-generated) @@ -113,8 +130,8 @@ async def upload_and_process_audio_files( title=title, summary="Processing uploaded audio file..." ) - # Set audio_path so the frontend can play the audio - conversation.audio_path = wav_filename + # Use the relative path returned by write_audio_file (already includes folder prefix if applicable) + conversation.audio_path = relative_audio_path await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID @@ -128,7 +145,8 @@ async def upload_and_process_audio_files( audio_uuid=audio_uuid, audio_file_path=file_path, user_id=user.user_id, - post_transcription=True # Run batch transcription for uploads + post_transcription=True, # Run batch transcription for uploads + client_id=client_id # Pass client_id for UI tracking ) processed_files.append({ @@ -192,6 +210,50 @@ async def upload_and_process_audio_files( ) +async def get_conversation_audio_path(conversation_id: str, user: User, cropped: bool = False) -> Path: + """ + Get the file path for a conversation's audio file. + + Args: + conversation_id: The conversation ID + user: The authenticated user + cropped: If True, return cropped audio path; if False, return original audio path + + Returns: + Path object for the audio file + + Raises: + ValueError: If conversation not found, access denied, or audio file not available + """ + # Get conversation by conversation_id (UUID field, not _id) + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + + if not conversation: + raise ValueError("Conversation not found") + + # Check ownership (admins can access all files) + if not user.is_superuser and conversation.user_id != str(user.user_id): + raise ValueError("Access denied") + + # Get the appropriate audio path + audio_path = conversation.cropped_audio_path if cropped else conversation.audio_path + + if not audio_path: + audio_type = "cropped" if cropped else "original" + raise ValueError(f"No {audio_type} audio file available for this conversation") + + # Build full file path + from advanced_omi_backend.app_config import get_audio_chunk_dir + audio_dir = get_audio_chunk_dir() + file_path = audio_dir / audio_path + + # Check if file exists + if not file_path.exists() or not file_path.is_file(): + raise ValueError("Audio file not found on disk") + + return file_path + + async def get_cropped_audio_info(audio_uuid: str, user: User): """ Get audio cropping metadata from the conversation. diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 09befebb..b9533391 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -96,28 +96,36 @@ async def get_conversation(conversation_id: str, user: User): if not user.is_superuser and conversation.user_id != str(user.user_id): return JSONResponse(status_code=403, content={"error": "Access forbidden"}) - # Format conversation for API response - clean, no duplication - formatted_conversation = conversation.model_dump( - mode='json', - exclude={'id'} # Exclude MongoDB internal _id - ) - - # Clean up transcript versions - remove heavy metadata.words - if 'transcript_versions' in formatted_conversation: - for version in formatted_conversation['transcript_versions']: - if 'metadata' in version and version['metadata']: - # Remove words array - not needed by frontend - version['metadata'].pop('words', None) - # Remove redundant speaker_recognition counts (derivable from segments) - if 'speaker_recognition' in version['metadata']: - sr = version['metadata']['speaker_recognition'] - sr.pop('total_segments', None) # Derivable from len(segments) - sr.pop('speaker_count', None) # Derivable from identified_speakers - - # Add minimal computed fields - formatted_conversation['has_memory'] = len(conversation.memory_versions) > 0 + # Build response with explicit curated fields + response = { + "conversation_id": conversation.conversation_id, + "audio_uuid": conversation.audio_uuid, + "user_id": conversation.user_id, + "client_id": conversation.client_id, + "audio_path": conversation.audio_path, + "cropped_audio_path": conversation.cropped_audio_path, + "created_at": conversation.created_at.isoformat() if conversation.created_at else None, + "deleted": conversation.deleted, + "deletion_reason": conversation.deletion_reason, + "deleted_at": conversation.deleted_at.isoformat() if conversation.deleted_at else None, + "end_reason": conversation.end_reason.value if conversation.end_reason else None, + "completed_at": conversation.completed_at.isoformat() if conversation.completed_at else None, + "title": conversation.title, + "summary": conversation.summary, + "detailed_summary": conversation.detailed_summary, + # Computed fields + "transcript": conversation.transcript, + "segments": [s.model_dump() for s in conversation.segments], + "segment_count": conversation.segment_count, + "memory_count": conversation.memory_count, + "has_memory": conversation.has_memory, + "active_transcript_version": conversation.active_transcript_version, + "active_memory_version": conversation.active_memory_version, + "transcript_version_count": conversation.transcript_version_count, + "memory_version_count": conversation.memory_version_count, + } - return {"conversation": formatted_conversation} + return {"conversation": response} except Exception as e: logger.error(f"Error fetching conversation {conversation_id}: {e}") @@ -137,30 +145,33 @@ async def get_conversations(user: User): # Admins see all conversations user_conversations = await Conversation.find_all().sort(-Conversation.created_at).to_list() - # Convert conversations to API format - minimal for list view + # Build response with explicit curated fields - minimal for list view conversations = [] for conv in user_conversations: - # Format conversation for list - exclude heavy version data - conv_dict = conv.model_dump( - mode='json', - exclude={'id', 'transcript_versions', 'memory_versions'} # Exclude large version arrays - ) - - # Add computed fields - # segment_count - count from active transcript version - segment_count = 0 - if conv.active_transcript: - segment_count = len(conv.active_transcript.segments) if conv.active_transcript.segments else 0 - - conv_dict.update({ - "segment_count": segment_count, - "has_memory": len(conv.memory_versions) > 0, - "transcript_version_count": len(conv.transcript_versions), - "memory_version_count": len(conv.memory_versions) + conversations.append({ + "conversation_id": conv.conversation_id, + "audio_uuid": conv.audio_uuid, + "user_id": conv.user_id, + "client_id": conv.client_id, + "audio_path": conv.audio_path, + "cropped_audio_path": conv.cropped_audio_path, + "created_at": conv.created_at.isoformat() if conv.created_at else None, + "deleted": conv.deleted, + "deletion_reason": conv.deletion_reason, + "deleted_at": conv.deleted_at.isoformat() if conv.deleted_at else None, + "title": conv.title, + "summary": conv.summary, + "detailed_summary": conv.detailed_summary, + "active_transcript_version": conv.active_transcript_version, + "active_memory_version": conv.active_memory_version, + # Computed fields (counts only, no heavy data) + "segment_count": conv.segment_count, + "has_memory": conv.has_memory, + "memory_count": conv.memory_count, + "transcript_version_count": conv.transcript_version_count, + "memory_version_count": conv.memory_version_count, }) - conversations.append(conv_dict) - return {"conversations": conversations} except Exception as e: @@ -324,7 +335,6 @@ async def reprocess_transcript(conversation_id: str, user: User): audio_uuid, str(full_audio_path), version_id, - str(user.user_id), "reprocess", job_timeout=600, result_ttl=JOB_RESULT_TTL, diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index ced15fc7..91773756 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -93,14 +93,22 @@ def get_job_stats() -> Dict[str, Any]: } -def get_jobs(limit: int = 20, offset: int = 0, queue_name: str = None) -> Dict[str, Any]: +def get_jobs( + limit: int = 20, + offset: int = 0, + queue_name: str = None, + job_type: str = None, + client_id: str = None +) -> Dict[str, Any]: """ - Get jobs from a specific queue or all queues. + Get jobs from a specific queue or all queues with optional filtering. Args: limit: Maximum number of jobs to return offset: Number of jobs to skip queue_name: Specific queue name or None for all queues + job_type: Filter by job type (matches func_name, e.g., "speech_detection") + client_id: Filter by client_id in job meta (partial match) Returns: Dict with jobs list and pagination metadata matching frontend expectations @@ -130,11 +138,21 @@ def get_jobs(limit: int = 20, offset: int = 0, queue_name: str = None) -> Dict[s user_id = job.kwargs.get("user_id", "") if job.kwargs else "" # Extract just the function name (e.g., "listen_for_speech_job" from "module.listen_for_speech_job") - job_type = job.func_name.split('.')[-1] if job.func_name else "unknown" + func_name = job.func_name.split('.')[-1] if job.func_name else "unknown" + + # Apply job_type filter + if job_type and job_type not in func_name: + continue + + # Apply client_id filter (partial match in meta) + if client_id: + job_client_id = job.meta.get("client_id", "") if job.meta else "" + if client_id not in job_client_id: + continue all_jobs.append({ "job_id": job.id, - "job_type": job_type, + "job_type": func_name, "user_id": user_id, "status": status, "priority": "normal", # RQ doesn't track priority in metadata @@ -315,7 +333,8 @@ def start_post_conversation_jobs( user_id: str, post_transcription: bool = True, transcript_version_id: Optional[str] = None, - depends_on_job = None + depends_on_job = None, + client_id: Optional[str] = None ) -> Dict[str, str]: """ Start post-conversation processing jobs after conversation is created. @@ -348,6 +367,11 @@ def start_post_conversation_jobs( version_id = transcript_version_id or str(uuid.uuid4()) + # Build job metadata (include client_id if provided for UI tracking) + job_meta = {'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + if client_id: + job_meta['client_id'] = client_id + # Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps) # Even for streaming, we need batch transcription before cropping to fix cumulative timestamps transcribe_job_id = f"transcribe_{conversation_id[:12]}" @@ -365,7 +389,7 @@ def start_post_conversation_jobs( depends_on=depends_on_job, job_id=transcribe_job_id, description=f"Transcribe conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta=job_meta ) logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}") crop_depends_on = transcription_job @@ -383,7 +407,7 @@ def start_post_conversation_jobs( depends_on=crop_depends_on, job_id=crop_job_id, description=f"Crop audio for conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta=job_meta ) logger.info(f"📥 RQ: Enqueued cropping job {cropping_job.id}, meta={cropping_job.meta}") @@ -406,7 +430,7 @@ def start_post_conversation_jobs( depends_on=speaker_depends_on, job_id=speaker_job_id, description=f"Speaker recognition for conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta=job_meta ) logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {speaker_depends_on.id})") @@ -422,7 +446,7 @@ def start_post_conversation_jobs( depends_on=speaker_job, job_id=memory_job_id, description=f"Memory extraction for conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta=job_meta ) logger.info(f"📥 RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on {speaker_job.id})") @@ -439,7 +463,7 @@ def start_post_conversation_jobs( depends_on=speaker_job, # Depends on speaker job, NOT memory job job_id=title_job_id, description=f"Generate title and summary for conversation {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta=job_meta ) logger.info(f"📥 RQ: Enqueued title/summary job {title_summary_job.id}, meta={title_summary_job.meta} (depends on {speaker_job.id})") diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index b75f22c8..5bc0b35d 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -55,47 +55,6 @@ async def get_auth_config(): } -async def get_processor_status(): - """Get RQ worker and queue status.""" - try: - # Get RQ queue health (new architecture) - from advanced_omi_backend.controllers.queue_controller import get_queue_health - queue_health = get_queue_health() - - status = { - "architecture": "rq_workers", # New RQ-based architecture - "timestamp": int(time.time()), - "workers": { - "total": queue_health.get("total_workers", 0), - "active": queue_health.get("active_workers", 0), - "idle": queue_health.get("idle_workers", 0), - "details": queue_health.get("workers", []) - }, - "queues": { - "transcription": queue_health.get("queues", {}).get("transcription", {}), - "memory": queue_health.get("queues", {}).get("memory", {}), - "default": queue_health.get("queues", {}).get("default", {}) - } - } - - # Get task manager status if available - try: - task_manager = get_task_manager() - if task_manager: - task_status = task_manager.get_health_status() - status["task_manager"] = task_status - except Exception as e: - status["task_manager"] = {"error": str(e)} - - return status - - except Exception as e: - logger.error(f"Error getting processor status: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"error": f"Failed to get processor status: {str(e)}"} - ) - - # Audio file processing functions moved to audio_controller.py diff --git a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py index 224695d0..ba7dd753 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/user_controller.py @@ -59,13 +59,13 @@ async def create_user(user_data: UserCreate): # Create the user through the user manager user = await user_manager.create(user_data) + # Return the full user object (serialized via UserRead schema) + from advanced_omi_backend.models.user import UserRead + user_read = UserRead.model_validate(user) + return JSONResponse( status_code=201, - content={ - "message": f"User {user.email} created successfully", - "user_id": str(user.id), - "user_email": user.email, - }, + content=user_read.model_dump(mode='json'), ) except Exception as e: @@ -81,7 +81,6 @@ async def create_user(user_data: UserCreate): async def update_user(user_id: str, user_data: UserUpdate): """Update an existing user.""" - print("DEBUG: New update_user function is being called!") try: # Validate ObjectId format try: @@ -109,16 +108,17 @@ async def update_user(user_id: str, user_data: UserUpdate): # Convert to User object for the manager user_obj = User(**existing_user) - # Update the user using the fastapi-users manager (now with fix for missing method) - updated_user = await user_manager.update(user_obj, user_data) + # Update the user using the fastapi-users manager + # Note: signature is update(user_update, user) - update data first, then user object + updated_user = await user_manager.update(user_data, user_obj) + + # Return the full user object (serialized via UserRead schema) + from advanced_omi_backend.models.user import UserRead + user_read = UserRead.model_validate(updated_user) return JSONResponse( status_code=200, - content={ - "message": f"User {updated_user.email} updated successfully", - "user_id": str(updated_user.id), - "user_email": updated_user.email, - }, + content=user_read.model_dump(mode='json'), ) except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 2448bfbf..a4338f2b 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -716,7 +716,7 @@ async def _process_batch_audio_complete( timestamp = int(time.time() * 1000) # Write audio file and create AudioFile entry - wav_filename, file_path, duration = await write_audio_file( + relative_audio_path, file_path, duration = await write_audio_file( raw_audio_data=complete_audio, audio_uuid=audio_uuid, client_id=client_id, @@ -727,7 +727,7 @@ async def _process_batch_audio_complete( ) application_logger.info( - f"✅ Batch mode: Wrote audio file {wav_filename} ({duration:.1f}s)" + f"✅ Batch mode: Wrote audio file {relative_audio_path} ({duration:.1f}s)" ) # Create conversation immediately for batch audio (conversation_id auto-generated) @@ -740,6 +740,7 @@ async def _process_batch_audio_complete( title="Batch Recording", summary="Processing batch audio..." ) + conversation.audio_path = relative_audio_path await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID @@ -753,7 +754,8 @@ async def _process_batch_audio_complete( audio_uuid=audio_uuid, audio_file_path=file_path, user_id=None, # Will be read from conversation in DB by jobs - post_transcription=True # Run batch transcription for uploads + post_transcription=True, # Run batch transcription for uploads + client_id=client_id # Pass client_id for UI tracking ) application_logger.info( diff --git a/backends/advanced/src/advanced_omi_backend/main.py b/backends/advanced/src/advanced_omi_backend/main.py index a9d9d47c..df51e1cc 100644 --- a/backends/advanced/src/advanced_omi_backend/main.py +++ b/backends/advanced/src/advanced_omi_backend/main.py @@ -44,6 +44,6 @@ host=host, port=port, reload=False, # Set to True for development - access_log=True, + access_log=False, # Disabled - using custom RequestLoggingMiddleware instead log_level="info" ) diff --git a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py index f886d31f..be2f2705 100644 --- a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py +++ b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py @@ -4,17 +4,21 @@ Centralizes CORS configuration and global exception handlers. """ +import json import logging +import time from typing import Optional from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from pymongo.errors import ConnectionFailure, PyMongoError +from starlette.middleware.base import BaseHTTPMiddleware from advanced_omi_backend.app_config import get_app_config logger = logging.getLogger(__name__) +request_logger = logging.getLogger("api.requests") def setup_cors_middleware(app: FastAPI) -> None: @@ -34,6 +38,147 @@ def setup_cors_middleware(app: FastAPI) -> None: ) +class RequestLoggingMiddleware(BaseHTTPMiddleware): + """ + Middleware to log API requests and JSON responses. + + Excludes: + - Authentication endpoints (login, logout) + - WebSocket connections + - Binary file responses (audio, images) + - Streaming responses + """ + + # Paths to exclude from logging + EXCLUDED_PATHS = { + "/auth/jwt/login", + "/auth/cookie/login", + "/auth/jwt/logout", + "/auth/cookie/logout", + "/ws", + "/ws_omi", + "/ws_pcm", + "/mcp", + "/health", + "/auth/health", + "/readiness", + } + + # Binary content types to exclude + BINARY_CONTENT_TYPES = { + "audio/", + "image/", + "video/", + "application/octet-stream", + } + + def should_log_request(self, path: str) -> bool: + """Determine if request should be logged.""" + # Exclude exact path matches + if path in self.EXCLUDED_PATHS: + return False + + # Exclude paths starting with excluded prefixes + for excluded in self.EXCLUDED_PATHS: + if path.startswith(excluded): + return False + + # Exclude audio file serving + if path.startswith("/audio/"): + return False + + return True + + def should_log_response_body(self, content_type: str) -> bool: + """Determine if response body should be logged.""" + if not content_type: + return True + + # Exclude binary content types + for binary_type in self.BINARY_CONTENT_TYPES: + if content_type.startswith(binary_type): + return False + + return True + + async def dispatch(self, request: Request, call_next): + """Process request and log request/response information.""" + path = request.url.path + + # Skip logging for excluded paths + if not self.should_log_request(path): + return await call_next(request) + + # Start timing + start_time = time.time() + + # Log request + request_logger.info(f"→ {request.method} {path}") + + # Process request + response = await call_next(request) + + # Calculate duration + duration_ms = (time.time() - start_time) * 1000 + + # Check if we should log response body + content_type = response.headers.get("content-type", "") + should_log_body = self.should_log_response_body(content_type) + + # Skip body logging for streaming responses + if isinstance(response, StreamingResponse): + request_logger.info( + f"← {request.method} {path} - {response.status_code} " + f"(streaming response) - {duration_ms:.2f}ms" + ) + return response + + # For non-streaming responses, try to extract and log JSON body + if should_log_body and response.status_code != 204: # No content + try: + # Read response body + response_body = b"" + async for chunk in response.body_iterator: + response_body += chunk + + # Try to parse as JSON for pretty printing + try: + json_body = json.loads(response_body) + formatted_json = json.dumps(json_body, indent=2) + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms\n" + f"Response body:\n{formatted_json}" + ) + except (json.JSONDecodeError, UnicodeDecodeError): + # Not JSON or not UTF-8, just log the status + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms " + f"(non-JSON response)" + ) + + # Recreate response with the body we consumed + from starlette.responses import Response + return Response( + content=response_body, + status_code=response.status_code, + headers=dict(response.headers), + media_type=response.media_type, + ) + except Exception as e: + # If anything goes wrong, just log basic info + request_logger.warning( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms " + f"(error reading response: {e})" + ) + return response + else: + # Just log status for responses without body + request_logger.info( + f"← {request.method} {path} - {response.status_code} - {duration_ms:.2f}ms" + ) + return response + + def setup_exception_handlers(app: FastAPI) -> None: """Configure global exception handlers for the FastAPI application.""" @@ -87,5 +232,9 @@ async def http_exception_handler(request: Request, exc: HTTPException): def setup_middleware(app: FastAPI) -> None: """Set up all middleware for the FastAPI application.""" + # Add request logging middleware + app.add_middleware(RequestLoggingMiddleware) + logger.info("📝 Request logging middleware enabled") + setup_cors_middleware(app) setup_exception_handlers(app) \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 56f55ede..7caf8a55 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -7,7 +7,7 @@ from datetime import datetime from typing import Dict, List, Optional, Any, Union -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, model_validator, computed_field from enum import Enum import uuid @@ -37,6 +37,15 @@ class ConversationStatus(str, Enum): COMPLETED = "completed" # All jobs succeeded FAILED = "failed" # One or more jobs failed + class EndReason(str, Enum): + """Reason for conversation ending.""" + USER_STOPPED = "user_stopped" # User manually stopped recording + INACTIVITY_TIMEOUT = "inactivity_timeout" # No speech detected for threshold period + WEBSOCKET_DISCONNECT = "websocket_disconnect" # Connection lost (Bluetooth, network, etc.) + MAX_DURATION = "max_duration" # Hit maximum conversation duration + ERROR = "error" # Processing error forced conversation end + UNKNOWN = "unknown" # Unknown or legacy reason + # Nested Models class SpeakerSegment(BaseModel): """Individual speaker segment in a transcript.""" @@ -86,6 +95,10 @@ class MemoryVersion(BaseModel): deletion_reason: Optional[str] = Field(None, description="Reason for deletion (no_meaningful_speech, audio_file_not_ready, etc.)") deleted_at: Optional[datetime] = Field(None, description="When the conversation was marked as deleted") + # Conversation completion tracking + end_reason: Optional["Conversation.EndReason"] = Field(None, description="Reason why the conversation ended") + completed_at: Optional[datetime] = Field(None, description="When the conversation was completed/closed") + # Summary fields (auto-generated from transcript) title: Optional[str] = Field(None, description="Auto-generated conversation title") summary: Optional[str] = Field(None, description="Auto-generated short summary (1-2 sentences)") @@ -146,6 +159,7 @@ def clean_legacy_data(cls, data: Any) -> Any: return data + @computed_field @property def active_transcript(self) -> Optional["Conversation.TranscriptVersion"]: """Get the currently active transcript version.""" @@ -157,6 +171,7 @@ def active_transcript(self) -> Optional["Conversation.TranscriptVersion"]: return version return None + @computed_field @property def active_memory(self) -> Optional["Conversation.MemoryVersion"]: """Get the currently active memory version.""" @@ -169,16 +184,48 @@ def active_memory(self) -> Optional["Conversation.MemoryVersion"]: return None # Convenience properties that return data from active transcript version + @computed_field @property def transcript(self) -> Optional[str]: """Get transcript text from active transcript version.""" return self.active_transcript.transcript if self.active_transcript else None + @computed_field @property def segments(self) -> List["Conversation.SpeakerSegment"]: """Get segments from active transcript version.""" return self.active_transcript.segments if self.active_transcript else [] + @computed_field + @property + def segment_count(self) -> int: + """Get segment count from active transcript version.""" + return len(self.segments) if self.segments else 0 + + @computed_field + @property + def memory_count(self) -> int: + """Get memory count from active memory version.""" + return self.active_memory.memory_count if self.active_memory else 0 + + @computed_field + @property + def has_memory(self) -> bool: + """Check if conversation has any memory versions.""" + return len(self.memory_versions) > 0 + + @computed_field + @property + def transcript_version_count(self) -> int: + """Get count of transcript versions.""" + return len(self.transcript_versions) + + @computed_field + @property + def memory_version_count(self) -> int: + """Get count of memory versions.""" + return len(self.memory_versions) + def add_transcript_version( self, version_id: str, diff --git a/backends/advanced/src/advanced_omi_backend/models/user.py b/backends/advanced/src/advanced_omi_backend/models/user.py index a3779021..b0ced195 100644 --- a/backends/advanced/src/advanced_omi_backend/models/user.py +++ b/backends/advanced/src/advanced_omi_backend/models/user.py @@ -33,19 +33,16 @@ class UserUpdate(BaseUserUpdate): display_name: Optional[str] = None is_superuser: Optional[bool] = None + def create_update_dict(self): + """Create update dictionary for regular user operations.""" + update_dict = super().create_update_dict() + if self.display_name is not None: + update_dict["display_name"] = self.display_name + return update_dict + def create_update_dict_superuser(self): """Create update dictionary for superuser operations.""" - update_dict = {} - if self.email is not None: - update_dict["email"] = self.email - if self.password is not None: - update_dict["password"] = self.password - if self.is_active is not None: - update_dict["is_active"] = self.is_active - if self.is_verified is not None: - update_dict["is_verified"] = self.is_verified - if self.is_superuser is not None: - update_dict["is_superuser"] = self.is_superuser + update_dict = super().create_update_dict_superuser() if self.display_name is not None: update_dict["display_name"] = self.display_name return update_dict diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 4c0f756b..5fb24a1a 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -4,24 +4,86 @@ Handles audio file uploads, processing job management, and audio file serving. """ -from pathlib import Path +from typing import Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from fastapi.responses import FileResponse -from advanced_omi_backend.auth import current_superuser, current_active_user +from advanced_omi_backend.auth import current_superuser, current_active_user_optional, get_user_from_token_param from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User -from advanced_omi_backend.app_config import get_audio_chunk_dir router = APIRouter(prefix="/audio", tags=["audio"]) +@router.get("/get_audio/{conversation_id}") +async def get_conversation_audio( + conversation_id: str, + cropped: bool = Query(default=False, description="Serve cropped (speech-only) audio instead of original"), + token: Optional[str] = Query(default=None, description="JWT token for audio element access"), + current_user: Optional[User] = Depends(current_active_user_optional), +): + """ + Serve audio file for a conversation. + + This endpoint uses conversation_id for direct lookup and ownership verification, + which is more efficient than querying by filename. + + Supports both header-based auth (Authorization: Bearer) and query param token + for