fix(task-sdk): add greenback fallback for Variable access in triggerer#63387
fix(task-sdk): add greenback fallback for Variable access in triggerer#63387YoannAbriel wants to merge 1 commit intoapache:mainfrom
Conversation
e6164e1 to
9c34355
Compare
d5e75ba to
a1323e5
Compare
a1323e5 to
1b12d6b
Compare
1b12d6b to
d7a1109
Compare
0c37a69 to
d7f004a
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds the missing greenback fallback path to ExecutionAPISecretsBackend.get_variable() so triggerer usage of Variable.get() doesn’t incorrectly return None when SUPERVISOR_COMMS.send() hits the async_to_sync RuntimeError in an async event loop.
Changes:
- Add
RuntimeErrordetection + greenback portal fallback toget_variable()(mirroringget_connection()behavior). - Add a unit test covering the new fallback path for variable access.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py | Adds greenback portal detection + async fallback for get_variable() when async_to_sync fails in triggerer contexts. |
| task-sdk/tests/task_sdk/execution_time/test_secrets.py | Adds a regression test ensuring get_variable() uses the greenback fallback when SUPERVISOR_COMMS.send() raises the known RuntimeError. |
| except RuntimeError as e: | ||
| # TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError | ||
| # when called within an async event loop. In greenback portal contexts (triggerer), | ||
| # we catch this and use greenback to call the async version instead. | ||
| if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): | ||
| import asyncio | ||
|
|
||
| import greenback | ||
|
|
||
| task = asyncio.current_task() | ||
| if greenback.has_portal(task): | ||
| import warnings | ||
|
|
||
| warnings.warn( | ||
| "You should not use sync calls here -- use `await aget_variable` instead", | ||
| stacklevel=2, | ||
| ) | ||
| return greenback.await_(self.aget_variable(key)) | ||
| # Fall through to the general exception handler for other RuntimeErrors | ||
| return None |
There was a problem hiding this comment.
import greenback is executed inside the except RuntimeError handler. If greenback is not installed/available in some client contexts, this will raise ImportError while handling the original RuntimeError and will escape the function (i.e., it will no longer return None and allow other backends to be tried). Wrap the greenback import/usage in a local try/except ImportError (and return None on failure), so the fallback remains best-effort and behavior stays consistent with the outer 'return None to allow fallback' intent.
| stacklevel=2, | ||
| ) | ||
| return greenback.await_(self.aget_variable(key)) | ||
| # Fall through to the general exception handler for other RuntimeErrors |
There was a problem hiding this comment.
The comment says 'Fall through to the general exception handler', but the code returns None immediately and cannot fall through to the except Exception block. Update the comment to match the control flow (e.g., 'For other RuntimeErrors, return None to allow fallback to other backends') or remove it to avoid misleading future readers.
| # Fall through to the general exception handler for other RuntimeErrors | |
| # For other RuntimeErrors, return None to allow fallback to other backends. |
| except RuntimeError as e: | ||
| # TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError | ||
| # when called within an async event loop. In greenback portal contexts (triggerer), | ||
| # we catch this and use greenback to call the async version instead. | ||
| if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): | ||
| import asyncio | ||
|
|
||
| import greenback | ||
|
|
||
| task = asyncio.current_task() | ||
| if greenback.has_portal(task): | ||
| import warnings | ||
|
|
||
| warnings.warn( | ||
| "You should not use sync calls here -- use `await aget_variable` instead", | ||
| stacklevel=2, | ||
| ) | ||
| return greenback.await_(self.aget_variable(key)) |
There was a problem hiding this comment.
The greenback portal detection + warning + await_ fallback logic is now duplicated between get_connection() and get_variable(). Consider extracting this into a small private helper (e.g., _greenback_sync_fallback(kind, coro) or similar) to reduce duplication and keep the warning text / detection logic consistent across the two code paths.
| def test_get_variable_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms): | ||
| """ | ||
| Test that RuntimeError from async_to_sync triggers greenback fallback for variables. | ||
|
|
||
| Same as the connection test but for get_variable — verifies the fix for #61676: | ||
| triggers calling Variable.get() fail because SUPERVISOR_COMMS.send() raises | ||
| RuntimeError in the async event loop, but the greenback fallback was missing. | ||
| """ | ||
| expected_value = "10" | ||
|
|
||
| # Simulate the RuntimeError that triggers greenback fallback | ||
| mock_supervisor_comms.send.side_effect = RuntimeError( | ||
| "You cannot use AsyncToSync in the same thread as an async event loop" | ||
| ) | ||
|
|
||
| # Mock the greenback and asyncio modules | ||
| mocker.patch("greenback.has_portal", return_value=True) | ||
| mocker.patch("asyncio.current_task") | ||
|
|
||
| import asyncio | ||
|
|
||
| def greenback_await_side_effect(coro): | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| return loop.run_until_complete(coro) | ||
| finally: | ||
| loop.close() | ||
|
|
||
| mock_greenback_await = mocker.patch("greenback.await_", side_effect=greenback_await_side_effect) | ||
|
|
||
| # Mock aget_variable to return the expected value | ||
| async def mock_aget_variable(self, key): | ||
| return expected_value | ||
|
|
||
| mocker.patch.object(ExecutionAPISecretsBackend, "aget_variable", mock_aget_variable) | ||
|
|
||
| backend = ExecutionAPISecretsBackend() | ||
| result = backend.get_variable("retries") | ||
|
|
||
| assert result == expected_value | ||
| mock_greenback_await.assert_called_once() | ||
| mock_supervisor_comms.send.assert_called_once() |
There was a problem hiding this comment.
This test exercises the greenback fallback, but it doesn’t assert the warnings.warn(...) behavior introduced by the fallback path. Adding an assertion (e.g., using pytest.warns and matching the message) would ensure the warning remains present and correct, and prevent future regressions where the fallback works but the guidance to use await aget_variable is accidentally removed/changed.
a01808a to
8008073
Compare
8008073 to
ed828f4
Compare
ExecutionAPISecretsBackend.get_variable()is missing theRuntimeError+ greenback fallback thatget_connection()already has (added in #57154). When a deferrable operator's trigger callsVariable.get()inside the triggerer's async event loop,SUPERVISOR_COMMS.send()raisesRuntimeErrorfromasync_to_sync. The bareexcept Exceptionswallows it and returnsNone, so every backend returns nothing and the variable is reported as not found.Added the same greenback portal detection and
aget_variable()fallback thatget_connection()uses. Added a matching unit test.Closes: #61676
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4, claude-opus-4-6) following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.