diff --git a/src/schematic/event_buffer.py b/src/schematic/event_buffer.py index 0c60c0d..475a3fe 100644 --- a/src/schematic/event_buffer.py +++ b/src/schematic/event_buffer.py @@ -1,6 +1,8 @@ import asyncio import logging +import random import threading +import time from typing import List, Optional from .events.client import AsyncEventsClient, EventsClient @@ -8,6 +10,8 @@ DEFAULT_MAX_EVENTS = 100 # Default maximum number of events DEFAULT_EVENT_BUFFER_PERIOD = 5 # 5 seconds +DEFAULT_MAX_RETRIES = 3 # Default maximum number of retry attempts +DEFAULT_INITIAL_RETRY_DELAY = 1 # Initial retry delay in seconds class EventBuffer: @@ -17,12 +21,16 @@ def __init__( logger: logging.Logger, period: Optional[int] = None, max_events: int = DEFAULT_MAX_EVENTS, + max_retries: int = DEFAULT_MAX_RETRIES, + initial_retry_delay: float = DEFAULT_INITIAL_RETRY_DELAY, ): self.events: List[CreateEventRequestBody] = [] self.events_api = events_api self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD self.logger = logger self.max_events = max_events + self.max_retries = max_retries + self.initial_retry_delay = initial_retry_delay self.flush_lock = threading.Lock() self.push_lock = threading.Lock() self.shutdown = threading.Event() @@ -39,10 +47,48 @@ def _flush(self): return events = [event for event in self.events if event is not None] - try: - self.events_api.create_event_batch(events=events) - except Exception as e: - self.logger.error(e) + + # Initialize retry counter and success flag + retry_count = 0 + success = False + last_exception = None + + # Try with retries and exponential backoff + while retry_count <= self.max_retries and not success: + try: + if retry_count > 0: + # Log retry attempt + self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") + + # Attempt to send events + self.events_api.create_event_batch(events=events) + success = True + + except Exception as e: + last_exception = e + retry_count += 1 + + if retry_count <= self.max_retries: + # Calculate backoff with jitter + delay = self.initial_retry_delay * (2 ** (retry_count - 1)) + jitter = random.uniform(0, 0.1 * delay) # 10% jitter + wait_time = delay + jitter + + self.logger.warning( + f"Event batch submission failed: {e}. " + f"Retrying in {wait_time:.2f} seconds..." + ) + + # Wait before retry + time.sleep(wait_time) + + # After all retries, if still not successful, log the error + if not success: + self.logger.error( + f"Event batch submission failed after {self.max_retries} retries: {last_exception}" + ) + elif retry_count > 0: + self.logger.info(f"Event batch submission succeeded after {retry_count} retries") self.events.clear() @@ -78,12 +124,16 @@ def __init__( logger: logging.Logger, period: Optional[int] = None, max_events: int = DEFAULT_MAX_EVENTS, + max_retries: int = DEFAULT_MAX_RETRIES, + initial_retry_delay: float = DEFAULT_INITIAL_RETRY_DELAY, ): self.events: List[CreateEventRequestBody] = [] self.events_api = events_api self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD self.logger = logger self.max_events = max_events + self.max_retries = max_retries + self.initial_retry_delay = initial_retry_delay self.shutdown_event = asyncio.Event() self.stopped = False self.flush_lock = asyncio.Lock() @@ -98,10 +148,48 @@ async def _flush(self): return events = [event for event in self.events if event is not None] - try: - await self.events_api.create_event_batch(events=events) - except Exception as e: - self.logger.error(e) + + # Initialize retry counter and success flag + retry_count = 0 + success = False + last_exception = None + + # Try with retries and exponential backoff + while retry_count <= self.max_retries and not success: + try: + if retry_count > 0: + # Log retry attempt + self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") + + # Attempt to send events + await self.events_api.create_event_batch(events=events) + success = True + + except Exception as e: + last_exception = e + retry_count += 1 + + if retry_count <= self.max_retries: + # Calculate backoff with jitter + delay = self.initial_retry_delay * (2 ** (retry_count - 1)) + jitter = random.uniform(0, 0.1 * delay) # 10% jitter + wait_time = delay + jitter + + self.logger.warning( + f"Event batch submission failed: {e}. " + f"Retrying in {wait_time:.2f} seconds..." + ) + + # Wait before retry (asyncio sleep) + await asyncio.sleep(wait_time) + + # After all retries, if still not successful, log the error + if not success: + self.logger.error( + f"Event batch submission failed after {self.max_retries} retries: {last_exception}" + ) + elif retry_count > 0: + self.logger.info(f"Event batch submission succeeded after {retry_count} retries") self.events.clear() diff --git a/tests/custom/test_event_buffer.py b/tests/custom/test_event_buffer.py index 9449095..bd71ff2 100644 --- a/tests/custom/test_event_buffer.py +++ b/tests/custom/test_event_buffer.py @@ -1,7 +1,8 @@ import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, patch, call import pytest +import asyncio from schematic.event_buffer import AsyncEventBuffer, EventBuffer from schematic.types import CreateEventRequestBody @@ -45,6 +46,7 @@ def test_flush(self): self.mock_api.create_event_batch.assert_called_once_with(events=[event]) self.assertEqual(len(self.event_buffer.events), 0) + def test_stop(self): with patch.object(self.event_buffer.flush_thread, "join"): self.event_buffer.stop() @@ -54,46 +56,106 @@ def test_stop(self): @pytest.mark.asyncio class TestAsyncEventBuffer: - @pytest.fixture(autouse=True) - async def setup_and_teardown(self): - self.mock_api = MagicMock() - self.mock_logger = MagicMock() - self.async_event_buffer = AsyncEventBuffer( - events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5 - ) - yield - await self.async_event_buffer.stop() - async def test_push_event(self): - event = MagicMock(spec=CreateEventRequestBody) + # Create a separate mock and buffer instance just for this test + mock_api = MagicMock() + mock_logger = MagicMock() + task_mock = MagicMock() - await self.async_event_buffer.push(event) - assert len(self.async_event_buffer.events) == 1 + # First patch create_task to avoid running periodic flush + with patch('asyncio.create_task', return_value=task_mock): + # Then create the buffer, which uses create_task + buffer = AsyncEventBuffer( + events_api=mock_api, logger=mock_logger, period=1, max_events=5 + ) - async def test_push_event_exceeding_max_events(self): - event = MagicMock(spec=CreateEventRequestBody) + # Test push event + event = MagicMock(spec=CreateEventRequestBody) + await buffer.push(event) - with patch.object(self.async_event_buffer, "_flush") as mock_flush: - for _ in range(5): - await self.async_event_buffer.push(event) - assert len(self.async_event_buffer.events) == 5 + # Verify event was added + assert len(buffer.events) == 1 - # Pushing one more event should trigger a flush - await self.async_event_buffer.push(event) - mock_flush.assert_called_once() + # Clean up + await buffer.stop() + + async def test_push_event_exceeding_max_events(self): + # Create a separate mock and buffer instance just for this test + mock_api = MagicMock() + mock_logger = MagicMock() + task_mock = MagicMock() + + # First patch create_task to avoid running periodic flush + with patch('asyncio.create_task', return_value=task_mock): + # Then create the buffer, which uses create_task + buffer = AsyncEventBuffer( + events_api=mock_api, logger=mock_logger, period=1, max_events=5 + ) + + # Setup test + event = MagicMock(spec=CreateEventRequestBody) + + with patch.object(buffer, "_flush") as mock_flush: + # Add events up to max + for _ in range(5): + await buffer.push(event) + assert len(buffer.events) == 5 + + # Pushing one more event should trigger a flush + await buffer.push(event) + mock_flush.assert_called_once() + + # Clean up + await buffer.stop() async def test_flush(self): - event = MagicMock(spec=CreateEventRequestBody) - self.async_event_buffer.events = [event] + # Create a separate mock and buffer instance just for this test + mock_api = MagicMock() + mock_logger = MagicMock() + task_mock = MagicMock() + + # First patch create_task to avoid running periodic flush + with patch('asyncio.create_task', return_value=task_mock): + # Also patch the max_retries to 0 to disable retry behavior for this test + buffer = AsyncEventBuffer( + events_api=mock_api, logger=mock_logger, period=1, max_events=5, + max_retries=0 # Disable retries for this specific test + ) + + # Setup test + event = MagicMock(spec=CreateEventRequestBody) + buffer.events = [event] + + # Execute the method under test + await buffer._flush() + + # Verify expectations + mock_api.create_event_batch.assert_called_once_with(events=[event]) + assert len(buffer.events) == 0 + + # Clean up + await buffer.stop() - await self.async_event_buffer._flush() - self.mock_api.create_event_batch.assert_called_once_with(events=[event]) - assert len(self.async_event_buffer.events) == 0 async def test_stop(self): - with patch.object(self.async_event_buffer.flush_task, "cancel"): - await self.async_event_buffer.stop() - assert self.async_event_buffer.shutdown_event.is_set() + # Create a separate mock and buffer instance just for this test + mock_api = MagicMock() + mock_logger = MagicMock() + task_mock = MagicMock() + + # First patch create_task to avoid running periodic flush + with patch('asyncio.create_task', return_value=task_mock): + # Then create the buffer, which uses create_task + buffer = AsyncEventBuffer( + events_api=mock_api, logger=mock_logger, period=1, max_events=5 + ) + + # Test stopping the buffer + await buffer.stop() + + # Verify shutdown was set + assert buffer.shutdown_event.is_set() + assert buffer.stopped is True if __name__ == "__main__": diff --git a/tests/custom/test_event_buffer_retry.py b/tests/custom/test_event_buffer_retry.py new file mode 100644 index 0000000..684dad0 --- /dev/null +++ b/tests/custom/test_event_buffer_retry.py @@ -0,0 +1,164 @@ +import unittest +from unittest.mock import MagicMock, patch, AsyncMock + +import pytest +import asyncio + +from schematic.event_buffer import AsyncEventBuffer, EventBuffer, DEFAULT_MAX_RETRIES +from schematic.types import CreateEventRequestBody + + +class TestEventBufferRetry(unittest.TestCase): + """Test the retry mechanism in the synchronous EventBuffer.""" + + def setUp(self): + self.mock_api = MagicMock() + self.mock_logger = MagicMock() + self.event_buffer = EventBuffer( + events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5 + ) + + def tearDown(self): + self.event_buffer.stop() + + def test_flush_with_retry(self): + """Test that the EventBuffer retries failed API calls.""" + event = MagicMock(spec=CreateEventRequestBody) + self.event_buffer.events = [event] + + # Configure mock to fail twice then succeed on third attempt + self.mock_api.create_event_batch.side_effect = [ + Exception("API failure 1"), + Exception("API failure 2"), + None # Success + ] + + with patch("time.sleep") as mock_sleep: # Mock sleep to speed up test + self.event_buffer._flush() + + # Verify retry attempts + self.assertEqual(self.mock_api.create_event_batch.call_count, 3) + self.assertEqual(mock_sleep.call_count, 2) # Sleep called twice (between retries) + + # Verify events are cleared after success + self.assertEqual(len(self.event_buffer.events), 0) + + # Verify logging + self.mock_logger.warning.assert_called() + self.mock_logger.info.assert_called_with("Event batch submission succeeded after 2 retries") + + def test_flush_with_max_retries_exhausted(self): + """Test that the EventBuffer gives up after max_retries attempts.""" + event = MagicMock(spec=CreateEventRequestBody) + self.event_buffer.events = [event] + + # Configure mock to always fail + self.mock_api.create_event_batch.side_effect = Exception("API failure") + + with patch("time.sleep") as mock_sleep: # Mock sleep to speed up test + self.event_buffer._flush() + + # Verify all retry attempts were made + self.assertEqual(self.mock_api.create_event_batch.call_count, DEFAULT_MAX_RETRIES + 1) + self.assertEqual(mock_sleep.call_count, DEFAULT_MAX_RETRIES) + + # Verify events are cleared even after failure + self.assertEqual(len(self.event_buffer.events), 0) + + # Verify error is logged + self.mock_logger.error.assert_called() + + +@pytest.mark.asyncio +class TestAsyncEventBufferRetry: + """Test the retry mechanism in the asynchronous AsyncEventBuffer.""" + + @pytest.fixture + async def buffer_with_mock_periodic_flush(self): + """Setup an AsyncEventBuffer with a mocked _periodic_flush function.""" + mock_api = AsyncMock() + mock_logger = MagicMock() + + # Create the buffer + buffer = AsyncEventBuffer( + events_api=mock_api, logger=mock_logger, period=1, max_events=5 + ) + + # Replace the _periodic_flush with a dummy coro that just returns immediately + async def dummy_periodic_flush(): + pass + + # Patch the method and cancel the task + with patch.object(buffer, '_periodic_flush', dummy_periodic_flush): + buffer.flush_task.cancel() # Cancel the original task to avoid warnings + try: + await buffer.flush_task + except asyncio.CancelledError: + pass + + # Create a new task with our dummy function + buffer.flush_task = asyncio.create_task(dummy_periodic_flush()) + + yield buffer, mock_api, mock_logger + + # Clean up after test + buffer.flush_task.cancel() + await buffer.stop() + + async def test_flush_with_retry(self, buffer_with_mock_periodic_flush): + """Test that the AsyncEventBuffer retries failed API calls.""" + buffer, mock_api, mock_logger = buffer_with_mock_periodic_flush + + # Setup test data + event = MagicMock(spec=CreateEventRequestBody) + buffer.events = [event] + + # Configure mock to fail twice then succeed on third attempt + mock_api.create_event_batch.side_effect = [ + Exception("API failure 1"), + Exception("API failure 2"), + None # Success + ] + + # Execute with mocked sleep + with patch("asyncio.sleep", return_value=None) as mock_sleep: + await buffer._flush() + + # Verify retry attempts + assert mock_api.create_event_batch.call_count == 3 + assert mock_sleep.call_count == 2 # Sleep called twice (between retries) + + # Verify events are cleared after success + assert len(buffer.events) == 0 + + # Verify logging + mock_logger.warning.assert_called() + mock_logger.info.assert_called_with("Event batch submission succeeded after 2 retries") + + async def test_flush_with_max_retries_exhausted(self, buffer_with_mock_periodic_flush): + """Test that the AsyncEventBuffer gives up after max_retries attempts.""" + buffer, mock_api, mock_logger = buffer_with_mock_periodic_flush + + # Setup test data + event = MagicMock(spec=CreateEventRequestBody) + buffer.events = [event] + + # Configure mock to always fail + mock_api.create_event_batch.side_effect = Exception("API failure") + + with patch("asyncio.sleep", return_value=None) as mock_sleep: + await buffer._flush() + + # Verify all retry attempts were made + assert mock_api.create_event_batch.call_count == DEFAULT_MAX_RETRIES + 1 + assert mock_sleep.call_count == DEFAULT_MAX_RETRIES + + # Verify events are cleared even after failure + assert len(buffer.events) == 0 + + # Verify error is logged + mock_logger.error.assert_called() + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file