From c3b6a1e87c2b32594b193e7466e13db417aa42ba Mon Sep 17 00:00:00 2001 From: Phil Stephens Date: Thu, 22 May 2025 00:57:36 +0000 Subject: [PATCH 1/5] feat: Update to support python 3.12 - add case statement to use the asyncio.Queue.shutdown method for 3.13+ - add special handling to allow for similar semantics as asyncio.Queue.shutdown for 3.12 Tested on multiple samples in the a2a repo and some examples in this repo --- pyproject.toml | 4 ++++ src/a2a/server/events/event_consumer.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 9bb3814a..a01a9014 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,10 +22,14 @@ classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", +<<<<<<< HEAD "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", +======= + "Programming Language :: Python :: 3.12", +>>>>>>> 8ec734c (feat: Update to support python 3.12) "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", "License :: OSI Approved :: Apache Software License", diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 6fc91856..89ba90d2 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -15,6 +15,12 @@ from a2a.utils.errors import ServerError from a2a.utils.telemetry import SpanKind, trace_class +# This is an alias to the execption for closed queue +QueueClosed = asyncio.QueueEmpty + +# When using python 3.13 or higher, the closed queue signal is QueueShutdown +if sys.version_info >= (3, 13): + QueueClosed = asyncio.QueueShutDown # This is an alias to the exception for closed queue QueueClosed = asyncio.QueueEmpty From 61c89a8c50c8ac785e80955369b030a51f0cb3d3 Mon Sep 17 00:00:00 2001 From: Phil Stephens Date: Thu, 22 May 2025 15:08:38 +0000 Subject: [PATCH 2/5] Change to 3.10 and provided detailed description about event queue usage --- pyproject.toml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a01a9014..9bb3814a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,14 +22,10 @@ classifiers = [ "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", -<<<<<<< HEAD "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", -======= - "Programming Language :: Python :: 3.12", ->>>>>>> 8ec734c (feat: Update to support python 3.12) "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", "License :: OSI Approved :: Apache Software License", From 2c1849edfceaa2b31690e75619c7668cbe15112d Mon Sep 17 00:00:00 2001 From: Phil Stephens Date: Thu, 22 May 2025 23:42:17 +0000 Subject: [PATCH 3/5] Fix race condition in python <= 3.12 --- src/a2a/server/events/event_consumer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 89ba90d2..2a96eca2 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -101,7 +101,6 @@ async def consume_all(self) -> AsyncGenerator[Event]: logger.debug( f'Dequeued event of type: {type(event)} in consume_all.' ) - yield event self.queue.task_done() logger.debug( 'Marked task as done in event queue in consume_all' @@ -123,10 +122,16 @@ async def consume_all(self) -> AsyncGenerator[Event]: ) ) + # Make sure the yield is after the close events, otherwise + # the caller may end up in a blocked state where this + # generator isn't called again to close things out and the + # other part is waiting for an event or a closed queue. if is_final_event: logger.debug('Stopping event consumption in consume_all.') await self.queue.close() + yield event break + yield event except TimeoutError: # continue polling until there is a final event continue From e4dd6051d8f0c1b04d0edc7fd0f24cdecbcbc32d Mon Sep 17 00:00:00 2001 From: Phil Stephens Date: Thu, 22 May 2025 23:44:47 +0000 Subject: [PATCH 4/5] fix merge conflict --- src/a2a/server/events/event_consumer.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 2a96eca2..a5c31317 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -15,14 +15,8 @@ from a2a.utils.errors import ServerError from a2a.utils.telemetry import SpanKind, trace_class -# This is an alias to the execption for closed queue -QueueClosed = asyncio.QueueEmpty -# When using python 3.13 or higher, the closed queue signal is QueueShutdown -if sys.version_info >= (3, 13): - QueueClosed = asyncio.QueueShutDown - -# This is an alias to the exception for closed queue +# This is an alias to the execption for closed queue QueueClosed = asyncio.QueueEmpty # When using python 3.13 or higher, the closed queue signal is QueueShutdown From d1703ed143b6bb16c6b4ee512d156ff782d7950a Mon Sep 17 00:00:00 2001 From: Phil Stephens Date: Thu, 22 May 2025 23:45:50 +0000 Subject: [PATCH 5/5] Fix typo --- src/a2a/server/events/event_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index a5c31317..51868069 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -16,7 +16,7 @@ from a2a.utils.telemetry import SpanKind, trace_class -# This is an alias to the execption for closed queue +# This is an alias to the exception for closed queue QueueClosed = asyncio.QueueEmpty # When using python 3.13 or higher, the closed queue signal is QueueShutdown