Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions src/schematic/event_buffer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import asyncio
import logging
import random
import threading
import time
from typing import List, Optional

from .events.client import AsyncEventsClient, EventsClient
from .types import CreateEventRequestBody

DEFAULT_MAX_EVENTS = 100 # Default maximum number of events
DEFAULT_EVENT_BUFFER_PERIOD = 5 # 5 seconds
DEFAULT_MAX_RETRIES = 3 # Default maximum number of retry attempts
DEFAULT_INITIAL_RETRY_DELAY = 1 # Initial retry delay in seconds


class EventBuffer:
Expand All @@ -17,12 +21,16 @@ def __init__(
logger: logging.Logger,
period: Optional[int] = None,
max_events: int = DEFAULT_MAX_EVENTS,
max_retries: int = DEFAULT_MAX_RETRIES,
initial_retry_delay: float = DEFAULT_INITIAL_RETRY_DELAY,
):
self.events: List[CreateEventRequestBody] = []
self.events_api = events_api
self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD
self.logger = logger
self.max_events = max_events
self.max_retries = max_retries
self.initial_retry_delay = initial_retry_delay
self.flush_lock = threading.Lock()
self.push_lock = threading.Lock()
self.shutdown = threading.Event()
Expand All @@ -39,10 +47,48 @@ def _flush(self):
return

events = [event for event in self.events if event is not None]
try:
self.events_api.create_event_batch(events=events)
except Exception as e:
self.logger.error(e)

# Initialize retry counter and success flag
retry_count = 0
success = False
last_exception = None

# Try with retries and exponential backoff
while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
# Log retry attempt
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

# Attempt to send events
self.events_api.create_event_batch(events=events)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
# Calculate backoff with jitter
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

# Wait before retry
time.sleep(wait_time)

# After all retries, if still not successful, log the error
if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")

self.events.clear()

Expand Down Expand Up @@ -78,12 +124,16 @@ def __init__(
logger: logging.Logger,
period: Optional[int] = None,
max_events: int = DEFAULT_MAX_EVENTS,
max_retries: int = DEFAULT_MAX_RETRIES,
initial_retry_delay: float = DEFAULT_INITIAL_RETRY_DELAY,
):
self.events: List[CreateEventRequestBody] = []
self.events_api = events_api
self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD
self.logger = logger
self.max_events = max_events
self.max_retries = max_retries
self.initial_retry_delay = initial_retry_delay
self.shutdown_event = asyncio.Event()
self.stopped = False
self.flush_lock = asyncio.Lock()
Expand All @@ -98,10 +148,48 @@ async def _flush(self):
return

events = [event for event in self.events if event is not None]
try:
await self.events_api.create_event_batch(events=events)
except Exception as e:
self.logger.error(e)

# Initialize retry counter and success flag
retry_count = 0
success = False
last_exception = None

# Try with retries and exponential backoff
while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
# Log retry attempt
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

# Attempt to send events
await self.events_api.create_event_batch(events=events)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
# Calculate backoff with jitter
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

# Wait before retry (asyncio sleep)
await asyncio.sleep(wait_time)

# After all retries, if still not successful, log the error
if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")

self.events.clear()

Expand Down
124 changes: 93 additions & 31 deletions tests/custom/test_event_buffer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import unittest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, call

import pytest
import asyncio

from schematic.event_buffer import AsyncEventBuffer, EventBuffer
from schematic.types import CreateEventRequestBody
Expand Down Expand Up @@ -45,6 +46,7 @@ def test_flush(self):
self.mock_api.create_event_batch.assert_called_once_with(events=[event])
self.assertEqual(len(self.event_buffer.events), 0)


def test_stop(self):
with patch.object(self.event_buffer.flush_thread, "join"):
self.event_buffer.stop()
Expand All @@ -54,46 +56,106 @@ def test_stop(self):
@pytest.mark.asyncio
class TestAsyncEventBuffer:

@pytest.fixture(autouse=True)
async def setup_and_teardown(self):
self.mock_api = MagicMock()
self.mock_logger = MagicMock()
self.async_event_buffer = AsyncEventBuffer(
events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5
)
yield
await self.async_event_buffer.stop()

async def test_push_event(self):
event = MagicMock(spec=CreateEventRequestBody)
# Create a separate mock and buffer instance just for this test
mock_api = MagicMock()
mock_logger = MagicMock()
task_mock = MagicMock()

await self.async_event_buffer.push(event)
assert len(self.async_event_buffer.events) == 1
# First patch create_task to avoid running periodic flush
with patch('asyncio.create_task', return_value=task_mock):
# Then create the buffer, which uses create_task
buffer = AsyncEventBuffer(
events_api=mock_api, logger=mock_logger, period=1, max_events=5
)

async def test_push_event_exceeding_max_events(self):
event = MagicMock(spec=CreateEventRequestBody)
# Test push event
event = MagicMock(spec=CreateEventRequestBody)
await buffer.push(event)

with patch.object(self.async_event_buffer, "_flush") as mock_flush:
for _ in range(5):
await self.async_event_buffer.push(event)
assert len(self.async_event_buffer.events) == 5
# Verify event was added
assert len(buffer.events) == 1

# Pushing one more event should trigger a flush
await self.async_event_buffer.push(event)
mock_flush.assert_called_once()
# Clean up
await buffer.stop()

async def test_push_event_exceeding_max_events(self):
# Create a separate mock and buffer instance just for this test
mock_api = MagicMock()
mock_logger = MagicMock()
task_mock = MagicMock()

# First patch create_task to avoid running periodic flush
with patch('asyncio.create_task', return_value=task_mock):
# Then create the buffer, which uses create_task
buffer = AsyncEventBuffer(
events_api=mock_api, logger=mock_logger, period=1, max_events=5
)

# Setup test
event = MagicMock(spec=CreateEventRequestBody)

with patch.object(buffer, "_flush") as mock_flush:
# Add events up to max
for _ in range(5):
await buffer.push(event)
assert len(buffer.events) == 5

# Pushing one more event should trigger a flush
await buffer.push(event)
mock_flush.assert_called_once()

# Clean up
await buffer.stop()

async def test_flush(self):
event = MagicMock(spec=CreateEventRequestBody)
self.async_event_buffer.events = [event]
# Create a separate mock and buffer instance just for this test
mock_api = MagicMock()
mock_logger = MagicMock()
task_mock = MagicMock()

# First patch create_task to avoid running periodic flush
with patch('asyncio.create_task', return_value=task_mock):
# Also patch the max_retries to 0 to disable retry behavior for this test
buffer = AsyncEventBuffer(
events_api=mock_api, logger=mock_logger, period=1, max_events=5,
max_retries=0 # Disable retries for this specific test
)

# Setup test
event = MagicMock(spec=CreateEventRequestBody)
buffer.events = [event]

# Execute the method under test
await buffer._flush()

# Verify expectations
mock_api.create_event_batch.assert_called_once_with(events=[event])
assert len(buffer.events) == 0

# Clean up
await buffer.stop()

await self.async_event_buffer._flush()
self.mock_api.create_event_batch.assert_called_once_with(events=[event])
assert len(self.async_event_buffer.events) == 0

async def test_stop(self):
with patch.object(self.async_event_buffer.flush_task, "cancel"):
await self.async_event_buffer.stop()
assert self.async_event_buffer.shutdown_event.is_set()
# Create a separate mock and buffer instance just for this test
mock_api = MagicMock()
mock_logger = MagicMock()
task_mock = MagicMock()

# First patch create_task to avoid running periodic flush
with patch('asyncio.create_task', return_value=task_mock):
# Then create the buffer, which uses create_task
buffer = AsyncEventBuffer(
events_api=mock_api, logger=mock_logger, period=1, max_events=5
)

# Test stopping the buffer
await buffer.stop()

# Verify shutdown was set
assert buffer.shutdown_event.is_set()
assert buffer.stopped is True


if __name__ == "__main__":
Expand Down
Loading