Skip to content

fix(task-sdk): add greenback fallback for Variable access in triggerer#63387

Open
YoannAbriel wants to merge 1 commit intoapache:mainfrom
YoannAbriel:fix/issue-61676
Open

fix(task-sdk): add greenback fallback for Variable access in triggerer#63387
YoannAbriel wants to merge 1 commit intoapache:mainfrom
YoannAbriel:fix/issue-61676

Conversation

@YoannAbriel
Copy link
Copy Markdown
Contributor

ExecutionAPISecretsBackend.get_variable() is missing the RuntimeError + greenback fallback that get_connection() already has (added in #57154). When a deferrable operator's trigger calls Variable.get() inside the triggerer's async event loop, SUPERVISOR_COMMS.send() raises RuntimeError from async_to_sync. The bare except Exception swallows it and returns None, so every backend returns nothing and the variable is reported as not found.

Added the same greenback portal detection and aget_variable() fallback that get_connection() uses. Added a matching unit test.

Closes: #61676


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4, claude-opus-4-6)

Generated-by: Claude Code (Opus 4, claude-opus-4-6) following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@YoannAbriel YoannAbriel force-pushed the fix/issue-61676 branch 4 times, most recently from e6164e1 to 9c34355 Compare March 14, 2026 13:06
@eladkal eladkal added this to the Airflow 3.1.9 milestone Mar 15, 2026
@YoannAbriel YoannAbriel force-pushed the fix/issue-61676 branch 2 times, most recently from d5e75ba to a1323e5 Compare March 23, 2026 13:09
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Apr 2, 2026
@eladkal eladkal added backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch and removed backport-to-v3-1-test labels Apr 6, 2026
@eladkal eladkal modified the milestones: Airflow 3.1.9, Airflow 3.2.1 Apr 6, 2026
@YoannAbriel YoannAbriel force-pushed the fix/issue-61676 branch 9 times, most recently from 0c37a69 to d7f004a Compare April 10, 2026 09:04
@kaxil kaxil requested a review from Copilot April 10, 2026 19:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds the missing greenback fallback path to ExecutionAPISecretsBackend.get_variable() so triggerer usage of Variable.get() doesn’t incorrectly return None when SUPERVISOR_COMMS.send() hits the async_to_sync RuntimeError in an async event loop.

Changes:

  • Add RuntimeError detection + greenback portal fallback to get_variable() (mirroring get_connection() behavior).
  • Add a unit test covering the new fallback path for variable access.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py Adds greenback portal detection + async fallback for get_variable() when async_to_sync fails in triggerer contexts.
task-sdk/tests/task_sdk/execution_time/test_secrets.py Adds a regression test ensuring get_variable() uses the greenback fallback when SUPERVISOR_COMMS.send() raises the known RuntimeError.

Comment on lines +116 to +135
except RuntimeError as e:
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
# when called within an async event loop. In greenback portal contexts (triggerer),
# we catch this and use greenback to call the async version instead.
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import asyncio

import greenback

task = asyncio.current_task()
if greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await aget_variable` instead",
stacklevel=2,
)
return greenback.await_(self.aget_variable(key))
# Fall through to the general exception handler for other RuntimeErrors
return None
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import greenback is executed inside the except RuntimeError handler. If greenback is not installed/available in some client contexts, this will raise ImportError while handling the original RuntimeError and will escape the function (i.e., it will no longer return None and allow other backends to be tried). Wrap the greenback import/usage in a local try/except ImportError (and return None on failure), so the fallback remains best-effort and behavior stays consistent with the outer 'return None to allow fallback' intent.

Copilot uses AI. Check for mistakes.
stacklevel=2,
)
return greenback.await_(self.aget_variable(key))
# Fall through to the general exception handler for other RuntimeErrors
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says 'Fall through to the general exception handler', but the code returns None immediately and cannot fall through to the except Exception block. Update the comment to match the control flow (e.g., 'For other RuntimeErrors, return None to allow fallback to other backends') or remove it to avoid misleading future readers.

Suggested change
# Fall through to the general exception handler for other RuntimeErrors
# For other RuntimeErrors, return None to allow fallback to other backends.

Copilot uses AI. Check for mistakes.
Comment on lines +116 to +133
except RuntimeError as e:
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
# when called within an async event loop. In greenback portal contexts (triggerer),
# we catch this and use greenback to call the async version instead.
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import asyncio

import greenback

task = asyncio.current_task()
if greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await aget_variable` instead",
stacklevel=2,
)
return greenback.await_(self.aget_variable(key))
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The greenback portal detection + warning + await_ fallback logic is now duplicated between get_connection() and get_variable(). Consider extracting this into a small private helper (e.g., _greenback_sync_fallback(kind, coro) or similar) to reduce duplication and keep the warning text / detection logic consistent across the two code paths.

Copilot uses AI. Check for mistakes.
Comment on lines +181 to +222
def test_get_variable_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms):
"""
Test that RuntimeError from async_to_sync triggers greenback fallback for variables.

Same as the connection test but for get_variable — verifies the fix for #61676:
triggers calling Variable.get() fail because SUPERVISOR_COMMS.send() raises
RuntimeError in the async event loop, but the greenback fallback was missing.
"""
expected_value = "10"

# Simulate the RuntimeError that triggers greenback fallback
mock_supervisor_comms.send.side_effect = RuntimeError(
"You cannot use AsyncToSync in the same thread as an async event loop"
)

# Mock the greenback and asyncio modules
mocker.patch("greenback.has_portal", return_value=True)
mocker.patch("asyncio.current_task")

import asyncio

def greenback_await_side_effect(coro):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()

mock_greenback_await = mocker.patch("greenback.await_", side_effect=greenback_await_side_effect)

# Mock aget_variable to return the expected value
async def mock_aget_variable(self, key):
return expected_value

mocker.patch.object(ExecutionAPISecretsBackend, "aget_variable", mock_aget_variable)

backend = ExecutionAPISecretsBackend()
result = backend.get_variable("retries")

assert result == expected_value
mock_greenback_await.assert_called_once()
mock_supervisor_comms.send.assert_called_once()
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test exercises the greenback fallback, but it doesn’t assert the warnings.warn(...) behavior introduced by the fallback path. Adding an assertion (e.g., using pytest.warns and matching the message) would ensure the warning remains present and correct, and prevent future regressions where the fallback works but the guidance to use await aget_variable is accidentally removed/changed.

Copilot uses AI. Check for mistakes.
@YoannAbriel YoannAbriel force-pushed the fix/issue-61676 branch 3 times, most recently from a01808a to 8008073 Compare April 13, 2026 06:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch ready for maintainer review Set after triaging when all criteria pass. type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Triggerer can not access Variables

5 participants