Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion application/single_app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
EXECUTOR_TYPE = 'thread'
EXECUTOR_MAX_WORKERS = 30
SESSION_TYPE = 'filesystem'
VERSION = "0.236.011"
VERSION = "0.236.012"


SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
Expand Down
66 changes: 54 additions & 12 deletions application/single_app/functions_retention_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
This module handles automated deletion of aged conversations and documents
based on configurable retention policies for personal, group, and public workspaces.

Version: 0.234.067
Version: 0.236.012
Implemented in: 0.234.067
Updated in: 0.236.012 - Fixed race condition handling for NotFound errors during deletion
"""

from config import *
Expand Down Expand Up @@ -565,10 +566,21 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
conversation_title = conv.get('title', 'Untitled')

# Read full conversation for archiving/logging
conversation_item = container.read_item(
item=conversation_id,
partition_key=conversation_id
)
try:
conversation_item = container.read_item(
item=conversation_id,
partition_key=conversation_id
)
except CosmosResourceNotFoundError:
# Conversation was already deleted (race condition) - this is fine, skip to next
debug_print(f"Conversation {conversation_id} already deleted (not found during read), skipping")
deleted_details.append({
'id': conversation_id,
'title': conversation_title,
'last_activity_at': conv.get('last_activity_at'),
'already_deleted': True
})
continue

# Archive if enabled
if archiving_enabled:
Expand Down Expand Up @@ -613,7 +625,11 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
archived_msg["archived_by_retention_policy"] = True
cosmos_archived_messages_container.upsert_item(archived_msg)

messages_container.delete_item(msg['id'], partition_key=conversation_id)
try:
messages_container.delete_item(msg['id'], partition_key=conversation_id)
except CosmosResourceNotFoundError:
# Message was already deleted - this is fine, continue
debug_print(f"Message {msg['id']} already deleted (not found), skipping")

# Log deletion
log_conversation_deletion(
Expand All @@ -631,10 +647,14 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
)

# Delete conversation
container.delete_item(
item=conversation_id,
partition_key=conversation_id
)
try:
container.delete_item(
item=conversation_id,
partition_key=conversation_id
)
except CosmosResourceNotFoundError:
# Conversation was already deleted after we read it (race condition) - this is fine
debug_print(f"Conversation {conversation_id} already deleted (not found during delete)")

deleted_details.append({
'id': conversation_id,
Expand Down Expand Up @@ -730,10 +750,21 @@ def delete_aged_documents(retention_days, workspace_type='personal', user_id=Non
doc_user_id = doc.get('user_id') or deletion_user_id

# Delete document chunks from search index
delete_document_chunks(document_id, group_id, public_workspace_id)
try:
delete_document_chunks(document_id, group_id, public_workspace_id)
except CosmosResourceNotFoundError:
# Document chunks already deleted - this is fine
debug_print(f"Document chunks for {document_id} already deleted (not found)")
except Exception as chunk_error:
# Log chunk deletion errors but continue with document deletion
debug_print(f"Error deleting chunks for document {document_id}: {chunk_error}")

# Delete document from Cosmos DB and blob storage
delete_document(doc_user_id, document_id, group_id, public_workspace_id)
try:
delete_document(doc_user_id, document_id, group_id, public_workspace_id)
except CosmosResourceNotFoundError:
# Document was already deleted (race condition) - this is fine
debug_print(f"Document {document_id} already deleted (not found)")

deleted_details.append({
'id': document_id,
Expand All @@ -744,6 +775,17 @@ def delete_aged_documents(retention_days, workspace_type='personal', user_id=Non

debug_print(f"Deleted document {document_id} ({file_name}) due to retention policy")

except CosmosResourceNotFoundError:
# Document was already deleted - count as success
doc_id = doc.get('id', 'unknown') if doc else 'unknown'
debug_print(f"Document {doc_id} already deleted (not found)")
deleted_details.append({
'id': doc_id,
'file_name': doc.get('file_name', 'Unknown'),
'title': doc.get('title', doc.get('file_name', 'Unknown')),
'last_updated': doc.get('last_updated'),
'already_deleted': True
})
except Exception as e:
doc_id = doc.get('id', 'unknown') if doc else 'unknown'
log_event("delete_aged_documents_deletion_error", {"error": str(e), "document_id": doc_id, "workspace_type": workspace_type})
Expand Down
95 changes: 95 additions & 0 deletions docs/explanation/fixes/v0.236.012/RETENTION_POLICY_NOTFOUND_FIX.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Retention Policy NotFound Error Fix

## Issue Description

The retention policy deletion process was logging errors when attempting to delete conversations or documents that had already been deleted (e.g., by another process or user action between the query and delete operations).

### Error Observed
```
DEBUG: [Log] delete_aged_conversations_deletion_error -- {'error': '(NotFound) Entity with the specified id does not exist in the system.
```

### Root Cause

This is a **race condition** scenario where:
1. The retention policy queries for aged conversations/documents
2. Between the query and the delete operation, the item is deleted by another process (user action, concurrent retention execution, etc.)
3. The delete operation fails with `CosmosResourceNotFoundError` (404 NotFound)

## Fix Applied

**Version: 0.236.012**

The fix adds specific handling for `CosmosResourceNotFoundError` in both conversation and document deletion loops:

### Conversations
- When reading a conversation before archiving: If not found, log debug message and count as already deleted
- When deleting messages: Catch NotFound and continue (message already gone)
- When deleting conversation: Catch NotFound and continue (conversation already gone)

### Documents
- When deleting document chunks: Catch NotFound and continue
- When deleting document: Catch NotFound and continue
- Outer try/catch also handles NotFound to count as successful deletion

## Files Modified

- [functions_retention_policy.py](../../../application/single_app/functions_retention_policy.py)
- `delete_aged_conversations()` - Added CosmosResourceNotFoundError handling
- `delete_aged_documents()` - Added CosmosResourceNotFoundError handling

## Technical Details

### Before Fix
```python
# Read would throw exception if item was deleted between query and read
conversation_item = container.read_item(
item=conversation_id,
partition_key=conversation_id
)
# Delete would throw exception if item was deleted
container.delete_item(
item=conversation_id,
partition_key=conversation_id
)
```

### After Fix
```python
try:
conversation_item = container.read_item(
item=conversation_id,
partition_key=conversation_id
)
except CosmosResourceNotFoundError:
# Already deleted - this is fine, count as success
debug_print(f"Conversation {conversation_id} already deleted (not found during read), skipping")
deleted_details.append({...})
continue

# ... archiving and message deletion ...

try:
container.delete_item(
item=conversation_id,
partition_key=conversation_id
)
except CosmosResourceNotFoundError:
# Already deleted between read and delete - this is fine
debug_print(f"Conversation {conversation_id} already deleted (not found during delete)")
```

## Benefits

1. **No false error logs**: Items that are already deleted no longer generate error entries
2. **Accurate counts**: Already-deleted items are properly counted as successful deletions
3. **Graceful handling**: Race conditions are handled without disrupting the overall retention process
4. **Better debugging**: Debug messages clearly indicate when items were already deleted

## Testing

Test by:
1. Enabling retention policy with a short retention period
2. Running the retention policy execution
3. Verify no NotFound errors are logged
4. Verify deletion counts accurately reflect processed items
180 changes: 180 additions & 0 deletions functional_tests/test_retention_policy_notfound_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#!/usr/bin/env python3
"""
Functional test for Retention Policy NotFound Error Handling.
Version: 0.236.012
Implemented in: 0.236.012

This test ensures that the retention policy correctly handles CosmosResourceNotFoundError
when attempting to delete conversations or documents that have already been deleted.
This prevents false error logging for race condition scenarios.
"""

import sys
import os

# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'application', 'single_app'))

def test_notfound_exception_import():
"""Test that CosmosResourceNotFoundError is properly imported."""
print("🔍 Testing CosmosResourceNotFoundError import...")

try:
from config import CosmosResourceNotFoundError
print("✅ CosmosResourceNotFoundError imported successfully from config")
return True
except ImportError as e:
print(f"❌ Failed to import CosmosResourceNotFoundError: {e}")
return False


def test_retention_policy_function_definitions():
"""Test that retention policy functions have proper exception handling."""
print("\n🔍 Testing retention policy function definitions...")

try:
import inspect
from functions_retention_policy import delete_aged_conversations, delete_aged_documents

# Get source code of delete_aged_conversations
conversations_source = inspect.getsource(delete_aged_conversations)

# Check for CosmosResourceNotFoundError handling in conversations function
if 'CosmosResourceNotFoundError' in conversations_source:
print("✅ delete_aged_conversations handles CosmosResourceNotFoundError")
else:
print("❌ delete_aged_conversations does not handle CosmosResourceNotFoundError")
return False

# Check for 'already deleted' debug message pattern
if 'already deleted' in conversations_source:
print("✅ delete_aged_conversations has 'already deleted' debug messaging")
else:
print("❌ delete_aged_conversations missing 'already deleted' debug messaging")
return False

# Get source code of delete_aged_documents
documents_source = inspect.getsource(delete_aged_documents)

# Check for CosmosResourceNotFoundError handling in documents function
if 'CosmosResourceNotFoundError' in documents_source:
print("✅ delete_aged_documents handles CosmosResourceNotFoundError")
else:
print("❌ delete_aged_documents does not handle CosmosResourceNotFoundError")
return False

# Check for 'already deleted' debug message pattern
if 'already deleted' in documents_source:
print("✅ delete_aged_documents has 'already deleted' debug messaging")
else:
print("❌ delete_aged_documents missing 'already deleted' debug messaging")
return False

return True

except Exception as e:
print(f"❌ Failed to verify function definitions: {e}")
import traceback
traceback.print_exc()
return False


def test_already_deleted_flag_in_details():
"""Test that already_deleted flag is used in the response details."""
print("\n🔍 Testing 'already_deleted' flag in response details...")

try:
import inspect
from functions_retention_policy import delete_aged_conversations, delete_aged_documents

# Get source code
conversations_source = inspect.getsource(delete_aged_conversations)
documents_source = inspect.getsource(delete_aged_documents)

# Check for 'already_deleted': True pattern in conversations
if "'already_deleted': True" in conversations_source or '"already_deleted": True' in conversations_source:
print("✅ delete_aged_conversations includes 'already_deleted' flag in details")
else:
print("❌ delete_aged_conversations missing 'already_deleted' flag in details")
return False

# Check for 'already_deleted': True pattern in documents
if "'already_deleted': True" in documents_source or '"already_deleted": True' in documents_source:
print("✅ delete_aged_documents includes 'already_deleted' flag in details")
else:
print("❌ delete_aged_documents missing 'already_deleted' flag in details")
return False

return True

except Exception as e:
print(f"❌ Failed to verify already_deleted flag: {e}")
import traceback
traceback.print_exc()
return False


def test_version_number():
"""Test that the version is updated correctly."""
print("\n🔍 Testing version number...")

try:
from config import VERSION

# Version should be at least 0.236.012
version_parts = VERSION.split('.')
major = int(version_parts[0])
minor = int(version_parts[1])
patch = int(version_parts[2])

if major == 0 and minor >= 236 and patch >= 12:
print(f"✅ Version {VERSION} is correct (>= 0.236.012)")
return True
elif major > 0 or minor > 236:
print(f"✅ Version {VERSION} is correct (later version)")
return True
else:
print(f"❌ Version {VERSION} is lower than expected 0.236.012")
return False

except Exception as e:
print(f"❌ Failed to verify version: {e}")
import traceback
traceback.print_exc()
return False


if __name__ == "__main__":
print("=" * 60)
print("Retention Policy NotFound Error Handling Test")
print("=" * 60)

tests = [
test_notfound_exception_import,
test_retention_policy_function_definitions,
test_already_deleted_flag_in_details,
test_version_number
]

results = []
for test in tests:
try:
result = test()
results.append(result)
except Exception as e:
print(f"❌ Test {test.__name__} failed with exception: {e}")
import traceback
traceback.print_exc()
results.append(False)

print("\n" + "=" * 60)
print(f"📊 Results: {sum(results)}/{len(results)} tests passed")
print("=" * 60)

if all(results):
print("\n✅ All tests passed! NotFound error handling is correctly implemented.")
sys.exit(0)
else:
print("\n❌ Some tests failed. Please review the implementation.")
sys.exit(1)
Loading