Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
927 changes: 34 additions & 893 deletions CLAUDE.md

Large diffs are not rendered by default.

69 changes: 66 additions & 3 deletions backends/advanced/src/advanced_omi_backend/audio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import logging
import os
import time
import wave
import io
import numpy as np
from pathlib import Path

# Type import to avoid circular imports
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -83,9 +87,68 @@ async def process_audio_chunk(

await processor_manager.queue_audio(processing_item)

# Update client state if provided
if client_state is not None:
client_state.update_audio_received(chunk)

async def load_audio_file_as_chunk(audio_path: Path) -> AudioChunk:
"""Load existing audio file into Wyoming AudioChunk format for reprocessing.

Args:
audio_path: Path to the audio file on disk

Returns:
AudioChunk object ready for processing

Raises:
FileNotFoundError: If audio file doesn't exist
ValueError: If audio file format is invalid
"""
try:
# Read the audio file
with open(audio_path, 'rb') as f:
file_content = f.read()

# Process WAV file using existing pattern from system_controller.py
with wave.open(io.BytesIO(file_content), "rb") as wav_file:
sample_rate = wav_file.getframerate()
sample_width = wav_file.getsampwidth()
channels = wav_file.getnchannels()
audio_data = wav_file.readframes(wav_file.getnframes())

# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")

Comment on lines +116 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix stereo→mono conversion: current code writes float bytes, corrupting PCM.

np.mean(..., dtype=np.int16).tobytes() yields float array bytes, not int16 PCM. Use integer accumulate/average, clip, and cast to int16 before .tobytes().

Apply this diff:

-            # Convert to mono if stereo (same logic as system_controller.py)
+            # Convert to mono if stereo (same logic as system_controller.py)
             if channels == 2:
                 if sample_width == 2:
-                    audio_array = np.frombuffer(audio_data, dtype=np.int16)
-                    audio_array = audio_array.reshape(-1, 2)
-                    audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
-                    channels = 1
+                    audio_array = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2)
+                    # Average L/R safely in wider int, then clip back to int16
+                    mono = (audio_array.astype(np.int32).sum(axis=1) // 2)
+                    mono = np.clip(mono, -32768, 32767).astype(np.int16)
+                    audio_data = mono.tobytes()
+                    channels = 1
                 else:
                     raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, 2)
audio_data = np.mean(audio_array, axis=1, dtype=np.int16).tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
# Convert to mono if stereo (same logic as system_controller.py)
if channels == 2:
if sample_width == 2:
audio_array = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 2)
# Average L/R safely in wider int, then clip back to int16
mono = (audio_array.astype(np.int32).sum(axis=1) // 2)
mono = np.clip(mono, -32768, 32767).astype(np.int16)
audio_data = mono.tobytes()
channels = 1
else:
raise ValueError(f"Unsupported sample width for stereo: {sample_width}")
🧰 Tools
🪛 Ruff (0.13.1)

124-124: Abstract raise to an inner function

(TRY301)


124-124: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/audio_utils.py around lines 116 to
125, the stereo→mono conversion uses np.mean with dtype=np.int16 then
.tobytes(), which produces float bytes and corrupts PCM; replace it by loading
samples as int16, reshape to (-1,2), compute integer-safe mean by summing with a
wider integer dtype (e.g., int32), dividing by 2 with proper rounding, clipping
to int16 range, casting back to np.int16, and then call .tobytes(); keep
channels=1 after conversion and raise the same error for unsupported widths.

# Validate format matches expected (16kHz, mono, 16-bit)
if sample_rate != 16000:
raise ValueError(f"Audio file has sample rate {sample_rate}Hz, expected 16kHz")
if channels != 1:
raise ValueError(f"Audio file has {channels} channels, expected mono")
if sample_width != 2:
raise ValueError(f"Audio file has {sample_width}-byte samples, expected 2 bytes")

# Create AudioChunk with current timestamp
chunk = AudioChunk(
audio=audio_data,
rate=sample_rate,
width=sample_width,
channels=channels,
timestamp=int(time.time() * 1000)
)

logger.info(f"Loaded audio file {audio_path} as AudioChunk ({len(audio_data)} bytes)")
return chunk

except FileNotFoundError:
logger.error(f"Audio file not found: {audio_path}")
raise
except Exception as e:
logger.error(f"Error loading audio file {audio_path}: {e}")
raise ValueError(f"Invalid audio file format: {e}")


async def _process_audio_cropping_with_relative_timestamps(
Expand Down
38 changes: 1 addition & 37 deletions backends/advanced/src/advanced_omi_backend/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,18 @@
application level by the ProcessorManager.
"""

import asyncio
import logging
import os
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from advanced_omi_backend.conversation_manager import get_conversation_manager
from advanced_omi_backend.database import AudioChunksRepository
from advanced_omi_backend.task_manager import get_task_manager
from wyoming.audio import AudioChunk

# Get loggers
audio_logger = logging.getLogger("audio_processing")

# Configuration constants
NEW_CONVERSATION_TIMEOUT_MINUTES = float(os.getenv("NEW_CONVERSATION_TIMEOUT_MINUTES", "1.5"))


class ClientState:
"""Manages conversation state for a single client connection."""
Expand Down Expand Up @@ -67,11 +61,6 @@ def __init__(

audio_logger.info(f"Created client state for {client_id}")

def update_audio_received(self, chunk: AudioChunk):
"""Update state when audio is received."""
# Check if we should start a new conversation
if self.should_start_new_conversation():
asyncio.create_task(self.start_new_conversation())

def set_current_audio_uuid(self, audio_uuid: str):
"""Set the current audio UUID when processor creates a new file."""
Expand Down Expand Up @@ -104,20 +93,9 @@ def record_speech_end(self, audio_uuid: str, timestamp: float):
audio_logger.warning(f"Speech end recorded for {audio_uuid} but no start time found")

def update_transcript_received(self):
"""Update timestamp when transcript is received (for timeout detection)."""
"""Update timestamp when transcript is received."""
self.last_transcript_time = time.time()

def should_start_new_conversation(self) -> bool:
"""Check if we should start a new conversation based on timeout."""
if self.last_transcript_time is None:
return False

current_time = time.time()
time_since_last_transcript = current_time - self.last_transcript_time
timeout_seconds = NEW_CONVERSATION_TIMEOUT_MINUTES * 60

return time_since_last_transcript > timeout_seconds

async def close_current_conversation(self):
"""Close the current conversation and queue necessary processing."""
# Prevent double closure
Expand Down Expand Up @@ -161,20 +139,6 @@ async def close_current_conversation(self):
else:
audio_logger.warning(f"⚠️ Conversation closure had issues for {self.current_audio_uuid}")

async def start_new_conversation(self):
"""Start a new conversation by closing current and resetting state."""
await self.close_current_conversation()

# Reset conversation state
self.current_audio_uuid = None
self.conversation_start_time = time.time()
self.last_transcript_time = None
self.conversation_closed = False

audio_logger.info(
f"Client {self.client_id}: Started new conversation due to "
f"{NEW_CONVERSATION_TIMEOUT_MINUTES}min timeout"
)

async def disconnect(self):
"""Clean disconnect of client state."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
import asyncio
import hashlib
import logging
import os
import time
from pathlib import Path
from typing import Optional

from advanced_omi_backend.audio_utils import (
_process_audio_cropping_with_relative_timestamps,
load_audio_file_as_chunk,
)
from advanced_omi_backend.client_manager import (
ClientManager,
client_belongs_to_user,
get_user_clients_all,
)
from advanced_omi_backend.database import AudioChunksRepository, ProcessingRunsRepository, chunks_col, processing_runs_col, conversations_col, ConversationsRepository
from advanced_omi_backend.users import User
from advanced_omi_backend.processors import get_processor_manager, TranscriptionItem, MemoryProcessingItem
from advanced_omi_backend.users import User, get_user_by_id
from fastapi.responses import JSONResponse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -585,9 +588,10 @@ async def reprocess_transcript(conversation_id: str, user: User):
)

# Generate configuration hash for duplicate detection
transcription_provider = os.getenv("TRANSCRIPTION_PROVIDER", "deepgram")
config_data = {
"audio_path": str(full_audio_path),
"transcription_provider": "deepgram", # This would come from settings
"transcription_provider": transcription_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -613,18 +617,37 @@ async def reprocess_transcript(conversation_id: str, user: User):
status_code=500, content={"error": "Failed to create transcript version"}
)

# TODO: Queue audio for reprocessing with ProcessorManager
# This is where we would integrate with the existing processor
# For now, we'll return the version ID for the caller to handle
# NEW: Load audio file and queue for transcription processing
try:
# Load audio file as AudioChunk
audio_chunk = await load_audio_file_as_chunk(full_audio_path)

# Create TranscriptionItem for reprocessing
transcription_item = TranscriptionItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
audio_uuid=audio_uuid,
audio_chunk=audio_chunk
)

# Queue for transcription processing
processor_manager = get_processor_manager()
await processor_manager.queue_transcription(transcription_item)

logger.info(f"Queued transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

logger.info(f"Created transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
except Exception as e:
logger.error(f"Error queuing transcript reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Transcript reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down Expand Up @@ -673,9 +696,10 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
)

# Generate configuration hash for duplicate detection
memory_provider = os.getenv("MEMORY_PROVIDER", "friend_lite")
config_data = {
"transcript_version_id": transcript_version_id,
"memory_provider": "friend_lite", # This would come from settings
"memory_provider": memory_provider,
"trigger": "manual_reprocess"
}
config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16]
Expand All @@ -702,18 +726,42 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use
status_code=500, content={"error": "Failed to create memory version"}
)

# TODO: Queue memory extraction for processing
# This is where we would integrate with the existing memory processor
# NEW: Queue memory processing
try:
# Get user email for memory processing
user_obj = await get_user_by_id(str(user.user_id))
if not user_obj:
return JSONResponse(
status_code=500, content={"error": "User not found for memory processing"}
)

# Create MemoryProcessingItem for reprocessing
memory_item = MemoryProcessingItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
user_email=user_obj.email,
conversation_id=conversation_id
)

Comment on lines +739 to +745
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Bug: transcript_version_id is not passed to memory reprocess job

Queue item lacks transcript_version_id, so the worker may process the wrong transcript version.

Apply:

             memory_item = MemoryProcessingItem(
                 client_id=f"reprocess-{conversation_id}",
                 user_id=str(user.user_id),
                 user_email=user_obj.email,
-                conversation_id=conversation_id
+                conversation_id=conversation_id,
+                transcript_version_id=transcript_version_id
             )

🏁 Script executed:

#!/bin/bash
# Verify MemoryProcessingItem supports transcript_version_id, and where it is consumed.
rg -nP --type=py -C3 '\bclass\s+MemoryProcessingItem\b|@dataclass'
rg -nP --type=py -C3 '\bMemoryProcessingItem\('
rg -nP --type=py -C5 '\bdef\s+queue_memory\b'

Length of output: 9376


🏁 Script executed:

#!/bin/bash
set -euo pipefail

proc_file="backends/advanced/src/advanced_omi_backend/processors.py"
trans_file="backends/advanced/src/advanced_omi_backend/transcription.py"
conv_file="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"

echo "=== processors.py: MemoryProcessingItem area ==="
line=$(rg -nP --line-number 'class\s+MemoryProcessingItem' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line" ]; then
  echo "MemoryProcessingItem class not found in $proc_file"
else
  start=$(( line - 10 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line + 60 ))
  sed -n "${start},${end}p" "$proc_file" || true
fi

echo
echo "=== processors.py: queue_memory function ==="
line_q=$(rg -nP --line-number 'def\s+queue_memory\s*\(' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_q" ]; then
  echo "queue_memory not found in $proc_file"
else
  start=$(( line_q - 8 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line_q + 80 ))
  sed -n "${start},${end}p" "$proc_file" || true
fi

echo
echo "=== transcription.py: MemoryProcessingItem instantiation area ==="
line_t=$(rg -nP --line-number 'MemoryProcessingItem\(' "$trans_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_t" ]; then
  echo "MemoryProcessingItem instantiation not found in $trans_file"
else
  start=$(( line_t - 8 ))
  if [ $start -lt 1 ]; then start=1; fi
  end=$(( line_t + 40 ))
  sed -n "${start},${end}p" "$trans_file" || true
fi

echo
echo "=== conversation_controller.py: reprocess area ==="
sed -n '720,760p' "$conv_file" || true

Length of output: 10658


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== search for transcript_version_id ==="
rg -n --hidden --no-ignore --type=py 'transcript_version_id' || true

echo
echo "=== search for transcript_version (broader) ==="
rg -n --hidden --no-ignore --type=py 'transcript_version' || true

echo
echo "=== search for attribute access .transcript_version_id ==="
rg -n --hidden --no-ignore --type=py '\.transcript_version_id\b' || true

echo
echo "=== search for MemoryProcessingItem instantiations ==="
rg -n --hidden --no-ignore --type=py 'MemoryProcessingItem\(' || true

echo
echo "=== search for memory queue consumer / processor ==="
rg -n --hidden --no-ignore --type=py 'memory_queue' || true
rg -n --hidden --no-ignore --type=py 'memory_processor' || true
rg -n --hidden --no-ignore --type=py 'process_memory' || true
rg -n --hidden --no-ignore --type=py 'def\s+.*memory.*\(' -n --pcre2 || true

Length of output: 23917


🏁 Script executed:

#!/bin/bash
set -euo pipefail

proc="backends/advanced/src/advanced_omi_backend/processors.py"
mem_service="backends/advanced/src/advanced_omi_backend/memory/memory_service.py"

echo "=== processors.py: _memory_processor and _process_memory_item ==="
rg -n --line-number 'async def _memory_processor\(|async def _process_memory_item\(|async def _should_process_memory\(' "$proc" || true
start_line=$(rg -n --line-number 'async def _memory_processor\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start_line" ]; then
  sed -n "$((start_line-6)),$((start_line+120))p" "$proc"
fi

echo
echo "=== processors.py: _process_memory_item exact area ==="
line_proc=$(rg -n --line-number 'async def _process_memory_item\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_proc" ]; then
  sed -n "$((line_proc-4)),$((line_proc+200))p" "$proc"
fi

echo
echo "=== memory_service.py: functions referencing transcript_version_id ==="
rg -n --line-number 'transcript_version_id' "$mem_service" || true
rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" || true
line_ms=$(rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_ms" ]; then
  sed -n "$((line_ms-6)),$((line_ms+120))p" "$mem_service"
fi

echo
echo "=== memory_service.py: top area ==="
sed -n '1,240p' "$mem_service" || true

Length of output: 30988


🏁 Script executed:

#!/bin/bash
set -euo pipefail

cc="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"
db="backends/advanced/src/advanced_omi_backend/database.py"

echo "=== conversation_controller.reprocess_memory ==="
rg -n --line-number 'async def reprocess_memory\(' "$cc" || true
start=$(rg -n --line-number 'async def reprocess_memory\(' "$cc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start" ]; then
  sed -n "$((start-4)),$((start+240))p" "$cc" || true
fi

echo
echo "=== database.create_transcript_version ==="
rg -n --line-number 'def\s+create_transcript_version\(' "$db" || true
startdb=$(rg -n --line-number 'def\s+create_transcript_version\(' "$db" | head -n1 | cut -d: -f1 || true)
if [ -n "$startdb" ]; then
  sed -n "$((startdb-4)),$((startdb+180))p" "$db" || true
fi

echo
echo "=== search for ConversationsRepository class/definition ==="
rg -n --hidden --no-ignore --type=py 'class\s+ConversationsRepository\b' || true
rg -n --hidden --no-ignore --type=py 'def\s+get_conversation\(' || true
rg -n --hidden --no-ignore --type=py 'class\s+Conversation' || true

Length of output: 16938


Critical: pass transcript_version_id through the memory queue and use it in the worker

Memory reprocess currently creates a memory_version tied to a specific transcript_version_id but queues a MemoryProcessingItem without that id; the memory worker reads conversation["transcript"] (the active transcript) and can therefore process the wrong version.

  • Add transcript_version_id: Optional[str] = None to MemoryProcessingItem in backends/advanced/src/advanced_omi_backend/processors.py (import Optional).
  • Update _process_memory_item in the same file to prefer item.transcript_version_id: lookup the matching entry in conversation["transcript_versions"] and use its "segments" to build the transcript; fall back to conversation["transcript"] if not found.
  • Pass transcript_version_id when creating the item in backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (and optionally include it when queuing from transcription.py for consistency).
  • Verify end-to-end: queued item contains the version id and the memory worker extracts the correct segments for that version.
🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 739-745, the MemoryProcessingItem is queued without the
transcript_version_id causing the memory worker to possibly reprocess the wrong
transcript; add transcript_version_id to the queued item by passing the correct
version id used to create the memory_version. Also update
backends/advanced/src/advanced_omi_backend/processors.py to: (1) add
"transcript_version_id: Optional[str] = None" to the MemoryProcessingItem
dataclass (import Optional), and (2) modify _process_memory_item to prefer
item.transcript_version_id — locate the matching entry in
conversation["transcript_versions"] and use its "segments" to build the
transcript, falling back to conversation["transcript"] if no match; optionally
propagate transcript_version_id when enqueueing from transcription.py for
consistency and verify end-to-end that the queued item contains the id and the
worker extracts the correct segments.

# Queue for memory processing
processor_manager = get_processor_manager()
await processor_manager.queue_memory(memory_item)

logger.info(f"Created memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")
logger.info(f"Queued memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}")

except Exception as e:
logger.error(f"Error queuing memory reprocessing: {e}")
return JSONResponse(
status_code=500, content={"error": f"Failed to queue memory reprocessing: {str(e)}"}
)

return JSONResponse(content={
"message": f"Memory reprocessing started for conversation {conversation_id}",
"run_id": run_id,
"version_id": version_id,
"transcript_version_id": transcript_version_id,
"config_hash": config_hash,
"status": "PENDING"
"status": "QUEUED"
})

except Exception as e:
Expand Down
Loading