-
Notifications
You must be signed in to change notification settings - Fork 25
Reprocess fix #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reprocess fix #116
Changes from all commits
2b1ef2e
4daf3cd
20cfe81
32cd0df
9903b88
38c7c36
38a4ba1
515fb81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,20 +5,23 @@ | |
| import asyncio | ||
| import hashlib | ||
| import logging | ||
| import os | ||
| import time | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| from advanced_omi_backend.audio_utils import ( | ||
| _process_audio_cropping_with_relative_timestamps, | ||
| load_audio_file_as_chunk, | ||
| ) | ||
| from advanced_omi_backend.client_manager import ( | ||
| ClientManager, | ||
| client_belongs_to_user, | ||
| get_user_clients_all, | ||
| ) | ||
| from advanced_omi_backend.database import AudioChunksRepository, ProcessingRunsRepository, chunks_col, processing_runs_col, conversations_col, ConversationsRepository | ||
| from advanced_omi_backend.users import User | ||
| from advanced_omi_backend.processors import get_processor_manager, TranscriptionItem, MemoryProcessingItem | ||
| from advanced_omi_backend.users import User, get_user_by_id | ||
| from fastapi.responses import JSONResponse | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -585,9 +588,10 @@ async def reprocess_transcript(conversation_id: str, user: User): | |
| ) | ||
|
|
||
| # Generate configuration hash for duplicate detection | ||
| transcription_provider = os.getenv("TRANSCRIPTION_PROVIDER", "deepgram") | ||
| config_data = { | ||
| "audio_path": str(full_audio_path), | ||
| "transcription_provider": "deepgram", # This would come from settings | ||
| "transcription_provider": transcription_provider, | ||
| "trigger": "manual_reprocess" | ||
| } | ||
| config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16] | ||
|
|
@@ -613,18 +617,37 @@ async def reprocess_transcript(conversation_id: str, user: User): | |
| status_code=500, content={"error": "Failed to create transcript version"} | ||
| ) | ||
|
|
||
| # TODO: Queue audio for reprocessing with ProcessorManager | ||
| # This is where we would integrate with the existing processor | ||
| # For now, we'll return the version ID for the caller to handle | ||
| # NEW: Load audio file and queue for transcription processing | ||
| try: | ||
| # Load audio file as AudioChunk | ||
| audio_chunk = await load_audio_file_as_chunk(full_audio_path) | ||
|
|
||
| # Create TranscriptionItem for reprocessing | ||
| transcription_item = TranscriptionItem( | ||
| client_id=f"reprocess-{conversation_id}", | ||
| user_id=str(user.user_id), | ||
| audio_uuid=audio_uuid, | ||
| audio_chunk=audio_chunk | ||
| ) | ||
|
|
||
| # Queue for transcription processing | ||
| processor_manager = get_processor_manager() | ||
| await processor_manager.queue_transcription(transcription_item) | ||
|
|
||
| logger.info(f"Queued transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") | ||
|
|
||
| logger.info(f"Created transcript reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") | ||
| except Exception as e: | ||
| logger.error(f"Error queuing transcript reprocessing: {e}") | ||
| return JSONResponse( | ||
| status_code=500, content={"error": f"Failed to queue reprocessing: {str(e)}"} | ||
| ) | ||
|
|
||
| return JSONResponse(content={ | ||
| "message": f"Transcript reprocessing started for conversation {conversation_id}", | ||
| "run_id": run_id, | ||
| "version_id": version_id, | ||
| "config_hash": config_hash, | ||
| "status": "PENDING" | ||
| "status": "QUEUED" | ||
| }) | ||
|
|
||
| except Exception as e: | ||
|
|
@@ -673,9 +696,10 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use | |
| ) | ||
|
|
||
| # Generate configuration hash for duplicate detection | ||
| memory_provider = os.getenv("MEMORY_PROVIDER", "friend_lite") | ||
| config_data = { | ||
| "transcript_version_id": transcript_version_id, | ||
| "memory_provider": "friend_lite", # This would come from settings | ||
| "memory_provider": memory_provider, | ||
| "trigger": "manual_reprocess" | ||
| } | ||
| config_hash = hashlib.sha256(str(config_data).encode()).hexdigest()[:16] | ||
|
|
@@ -702,18 +726,42 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use | |
| status_code=500, content={"error": "Failed to create memory version"} | ||
| ) | ||
|
|
||
| # TODO: Queue memory extraction for processing | ||
| # This is where we would integrate with the existing memory processor | ||
| # NEW: Queue memory processing | ||
| try: | ||
| # Get user email for memory processing | ||
| user_obj = await get_user_by_id(str(user.user_id)) | ||
| if not user_obj: | ||
| return JSONResponse( | ||
| status_code=500, content={"error": "User not found for memory processing"} | ||
| ) | ||
|
|
||
| # Create MemoryProcessingItem for reprocessing | ||
| memory_item = MemoryProcessingItem( | ||
| client_id=f"reprocess-{conversation_id}", | ||
| user_id=str(user.user_id), | ||
| user_email=user_obj.email, | ||
| conversation_id=conversation_id | ||
| ) | ||
|
|
||
|
Comment on lines
+739
to
+745
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainBug: transcript_version_id is not passed to memory reprocess job Queue item lacks Apply: memory_item = MemoryProcessingItem(
client_id=f"reprocess-{conversation_id}",
user_id=str(user.user_id),
user_email=user_obj.email,
- conversation_id=conversation_id
+ conversation_id=conversation_id,
+ transcript_version_id=transcript_version_id
)🏁 Script executed: #!/bin/bash
# Verify MemoryProcessingItem supports transcript_version_id, and where it is consumed.
rg -nP --type=py -C3 '\bclass\s+MemoryProcessingItem\b|@dataclass'
rg -nP --type=py -C3 '\bMemoryProcessingItem\('
rg -nP --type=py -C5 '\bdef\s+queue_memory\b'Length of output: 9376 🏁 Script executed: #!/bin/bash
set -euo pipefail
proc_file="backends/advanced/src/advanced_omi_backend/processors.py"
trans_file="backends/advanced/src/advanced_omi_backend/transcription.py"
conv_file="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"
echo "=== processors.py: MemoryProcessingItem area ==="
line=$(rg -nP --line-number 'class\s+MemoryProcessingItem' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line" ]; then
echo "MemoryProcessingItem class not found in $proc_file"
else
start=$(( line - 10 ))
if [ $start -lt 1 ]; then start=1; fi
end=$(( line + 60 ))
sed -n "${start},${end}p" "$proc_file" || true
fi
echo
echo "=== processors.py: queue_memory function ==="
line_q=$(rg -nP --line-number 'def\s+queue_memory\s*\(' "$proc_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_q" ]; then
echo "queue_memory not found in $proc_file"
else
start=$(( line_q - 8 ))
if [ $start -lt 1 ]; then start=1; fi
end=$(( line_q + 80 ))
sed -n "${start},${end}p" "$proc_file" || true
fi
echo
echo "=== transcription.py: MemoryProcessingItem instantiation area ==="
line_t=$(rg -nP --line-number 'MemoryProcessingItem\(' "$trans_file" | head -n1 | cut -d: -f1 || true)
if [ -z "$line_t" ]; then
echo "MemoryProcessingItem instantiation not found in $trans_file"
else
start=$(( line_t - 8 ))
if [ $start -lt 1 ]; then start=1; fi
end=$(( line_t + 40 ))
sed -n "${start},${end}p" "$trans_file" || true
fi
echo
echo "=== conversation_controller.py: reprocess area ==="
sed -n '720,760p' "$conv_file" || trueLength of output: 10658 🏁 Script executed: #!/bin/bash
set -euo pipefail
echo "=== search for transcript_version_id ==="
rg -n --hidden --no-ignore --type=py 'transcript_version_id' || true
echo
echo "=== search for transcript_version (broader) ==="
rg -n --hidden --no-ignore --type=py 'transcript_version' || true
echo
echo "=== search for attribute access .transcript_version_id ==="
rg -n --hidden --no-ignore --type=py '\.transcript_version_id\b' || true
echo
echo "=== search for MemoryProcessingItem instantiations ==="
rg -n --hidden --no-ignore --type=py 'MemoryProcessingItem\(' || true
echo
echo "=== search for memory queue consumer / processor ==="
rg -n --hidden --no-ignore --type=py 'memory_queue' || true
rg -n --hidden --no-ignore --type=py 'memory_processor' || true
rg -n --hidden --no-ignore --type=py 'process_memory' || true
rg -n --hidden --no-ignore --type=py 'def\s+.*memory.*\(' -n --pcre2 || trueLength of output: 23917 🏁 Script executed: #!/bin/bash
set -euo pipefail
proc="backends/advanced/src/advanced_omi_backend/processors.py"
mem_service="backends/advanced/src/advanced_omi_backend/memory/memory_service.py"
echo "=== processors.py: _memory_processor and _process_memory_item ==="
rg -n --line-number 'async def _memory_processor\(|async def _process_memory_item\(|async def _should_process_memory\(' "$proc" || true
start_line=$(rg -n --line-number 'async def _memory_processor\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start_line" ]; then
sed -n "$((start_line-6)),$((start_line+120))p" "$proc"
fi
echo
echo "=== processors.py: _process_memory_item exact area ==="
line_proc=$(rg -n --line-number 'async def _process_memory_item\(' "$proc" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_proc" ]; then
sed -n "$((line_proc-4)),$((line_proc+200))p" "$proc"
fi
echo
echo "=== memory_service.py: functions referencing transcript_version_id ==="
rg -n --line-number 'transcript_version_id' "$mem_service" || true
rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" || true
line_ms=$(rg -n --line-number 'def\s+_process_memory_updates\(' "$mem_service" | head -n1 | cut -d: -f1 || true)
if [ -n "$line_ms" ]; then
sed -n "$((line_ms-6)),$((line_ms+120))p" "$mem_service"
fi
echo
echo "=== memory_service.py: top area ==="
sed -n '1,240p' "$mem_service" || trueLength of output: 30988 🏁 Script executed: #!/bin/bash
set -euo pipefail
cc="backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py"
db="backends/advanced/src/advanced_omi_backend/database.py"
echo "=== conversation_controller.reprocess_memory ==="
rg -n --line-number 'async def reprocess_memory\(' "$cc" || true
start=$(rg -n --line-number 'async def reprocess_memory\(' "$cc" | head -n1 | cut -d: -f1 || true)
if [ -n "$start" ]; then
sed -n "$((start-4)),$((start+240))p" "$cc" || true
fi
echo
echo "=== database.create_transcript_version ==="
rg -n --line-number 'def\s+create_transcript_version\(' "$db" || true
startdb=$(rg -n --line-number 'def\s+create_transcript_version\(' "$db" | head -n1 | cut -d: -f1 || true)
if [ -n "$startdb" ]; then
sed -n "$((startdb-4)),$((startdb+180))p" "$db" || true
fi
echo
echo "=== search for ConversationsRepository class/definition ==="
rg -n --hidden --no-ignore --type=py 'class\s+ConversationsRepository\b' || true
rg -n --hidden --no-ignore --type=py 'def\s+get_conversation\(' || true
rg -n --hidden --no-ignore --type=py 'class\s+Conversation' || trueLength of output: 16938 Critical: pass transcript_version_id through the memory queue and use it in the worker Memory reprocess currently creates a memory_version tied to a specific transcript_version_id but queues a MemoryProcessingItem without that id; the memory worker reads conversation["transcript"] (the active transcript) and can therefore process the wrong version.
🤖 Prompt for AI Agents |
||
| # Queue for memory processing | ||
| processor_manager = get_processor_manager() | ||
| await processor_manager.queue_memory(memory_item) | ||
|
|
||
| logger.info(f"Created memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") | ||
| logger.info(f"Queued memory reprocessing job {run_id} (version {version_id}) for conversation {conversation_id}") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error queuing memory reprocessing: {e}") | ||
| return JSONResponse( | ||
| status_code=500, content={"error": f"Failed to queue memory reprocessing: {str(e)}"} | ||
| ) | ||
|
|
||
| return JSONResponse(content={ | ||
| "message": f"Memory reprocessing started for conversation {conversation_id}", | ||
| "run_id": run_id, | ||
| "version_id": version_id, | ||
| "transcript_version_id": transcript_version_id, | ||
| "config_hash": config_hash, | ||
| "status": "PENDING" | ||
| "status": "QUEUED" | ||
| }) | ||
|
|
||
| except Exception as e: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix stereo→mono conversion: current code writes float bytes, corrupting PCM.
np.mean(..., dtype=np.int16).tobytes()yields float array bytes, not int16 PCM. Use integer accumulate/average, clip, and cast to int16 before.tobytes().Apply this diff:
📝 Committable suggestion
🧰 Tools
🪛 Ruff (0.13.1)
124-124: Abstract
raiseto an inner function(TRY301)
124-124: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents