diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 6fc91856..51868069 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -95,7 +95,6 @@ async def consume_all(self) -> AsyncGenerator[Event]: logger.debug( f'Dequeued event of type: {type(event)} in consume_all.' ) - yield event self.queue.task_done() logger.debug( 'Marked task as done in event queue in consume_all' @@ -117,10 +116,16 @@ async def consume_all(self) -> AsyncGenerator[Event]: ) ) + # Make sure the yield is after the close events, otherwise + # the caller may end up in a blocked state where this + # generator isn't called again to close things out and the + # other part is waiting for an event or a closed queue. if is_final_event: logger.debug('Stopping event consumption in consume_all.') await self.queue.close() + yield event break + yield event except TimeoutError: # continue polling until there is a final event continue