fix(databricks): avoid AsyncToSync error in triggerer for deferrable operators#63775
fix(databricks): avoid AsyncToSync error in triggerer for deferrable operators#63775techcodie wants to merge 4 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
639cf05 to
edbf224
Compare
|
@potiuk Done! Merged latest |
|
@techcodie - I have a similar PR in #63611 nearly ready to go. Could I enlist your help verify/test? |
|
@bob-skowron Sure — happy to help verify. I can reproduce the triggerer crash with |
@techcodie awesome! I tested using the |
|
@bob-skowron Thanks! I’ll take a look at #63611 as well. From what I can see, both approaches target the same root cause around avoiding the sync connection path inside the triggerer event loop. In this PR, the async connection is resolved upfront so the cached_property never hits the sync path in async context. Happy to align if needed, but this approach keeps the change minimal and avoids impacting existing sync behaviour. |
|
Agreed, it is much simpler. I would have to defer to @mwojtyczka or @moomindani on their thoughts though regarding the tradeoffs. |
|
Thanks for the discussion! This PR fixes the issue by resolving the async connection upfront, so the cached property doesn’t hit the sync path inside the triggerer event loop. Tested with deferrable operators — the AsyncToSync error is gone and triggers run fine. |
|
Thanks for the PR and the clear root-cause analysis! I took a look at both this PR and #63611 which targets the same issue. A few thoughts: The "pre-warm the cache" approach here is clever but fragile. It works because The import of #63611 takes a more comprehensive approach — it replaces every I'd suggest either:
What do you think? |
f4161f4 to
65b21e8
Compare
eaf9a3c to
7b3b956
Compare
…operators - Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously. - Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops. - Added async mirror methods for all connection-dependent paths. - Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility. - Verified with 229+ unit tests and real Databricks workspace validation. closes: apache#63775
c864e4b to
9859075
Compare
|
Thanks for the review, @jscheffl! I've removed the newsfragment for core since this is a provider bugfix, but I've kept a dedicated newsfragment for the Databricks provider to help users. The PR now follows the comprehensive async refactor requested by @moomindani, is correctly based on the latest upstream main, and passes all 567 local tests (including 229 hooks/triggers tests). All verification in a real Databricks workspace was successful. Ready for final review and merge! |
|
Sorry, you had a bad re-base. Needs to be fixed. |
…operators - Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously. - Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops. - Added async mirror methods for all connection-dependent paths. - Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility. - Verified with 229+ unit tests and real Databricks workspace validation. closes: apache#63775
9859075 to
4a95d78
Compare
|
@jscheffl Fixed the rebase by pruning all unrelated changes, moving the newsfragment to the provider directory, and squashing to a single clean commit. Ready for final review! |
There is no concept about "newsfragments" from providers. All changes go into changelogs or into RST documentation. |
…operators - Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously. - Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops. - Added async mirror methods for all connection-dependent paths. - Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility. - Verified with 229+ unit tests and real Databricks workspace validation. closes: apache#63775
4a95d78 to
8b8de3b
Compare
|
Fixed the re-base by squashing to a single clean commit on the latest main. Removed all unrelated changes and newsfragments, updated the provider's changelog.rst directly, and refreshed the operator documentation to match the actual code. Ready for final review! |
There was a problem hiding this comment.
Changelogs are adjusted by release manager based on merged commits. New version numbers and the bullet points are created at point of release. Please only add content if there is a major change (e.g. breaking change).
…operators - Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously. - Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops. - Added async mirror methods for all connection-dependent paths. - Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility. - Verified with 229+ unit tests and real Databricks workspace validation. closes: apache#63775
b12058c to
ff11cb4
Compare
|
Thanks for the pointer, @jscheffl! I've reverted the manual changelog edits and returned it to its previous state. I've kept the small updates to the operator docs that remove references to the deleted legacy deferrable classes to keep the documentation accurate. |
|
@jscheffl I have made all the changes as per your request, and am happy to fix anything else which is also currently broken or you want improved if not can you please merge it. |
jscheffl
left a comment
There was a problem hiding this comment.
Looks good but I have not much insight or understanding with databricks and API backends so kindly ask a second pair of maintainer eyes before merge.
| auth=aiohttp.BasicAuth( | ||
| self._get_connection_attr("login"), self.databricks_conn.password | ||
| ), | ||
| auth=aiohttp.BasicAuth(self._get_connection_attr("login", conn), conn.password or ""), |
There was a problem hiding this comment.
conn.password or "" silently converts a None password to an empty string. The sync counterpart (_get_sp_token) passes self.databricks_conn.password directly to HTTPBasicAuth, which would fail with a TypeError if password is None. The async version swallows the problem and sends empty credentials to Azure, producing a confusing 401.
Same pattern at line 440 (client_secret=conn.password or "") and in the _a_do_api_call basic auth fallback.
Consider raising explicitly if conn.password is None in these paths, or at minimum adding a comment explaining why or "" is intentional (e.g. aiohttp.BasicAuth rejects None).
| :param resource: resource to issue token to | ||
| :return: AAD token, or raise an exception | ||
| """ | ||
| await self.a_databricks_conn() |
There was a problem hiding this comment.
This await self.a_databricks_conn() call is discarded (no conn = ...). This method (_a_get_aad_token_for_default_az_credential) never accesses the connection object -- it only uses self.oauth_tokens and DefaultAzureCredential. The call adds an unnecessary network round-trip to fetch the connection on first entry into this code path. If this is meant as a "warm the cache" call, __aenter__ already does that.
| ) | ||
|
|
||
| async def run(self): | ||
| statement_state = SQLStatementState(state="PENDING") |
There was a problem hiding this comment.
Pre-initializing statement_state = SQLStatementState(state="PENDING") prevents the UnboundLocalError when the timeout is already expired before the first poll -- good fix.
But the emitted TriggerEvent will contain a fabricated "PENDING" state that never came from the Databricks API. Consider handling the immediate-timeout case before the loop instead:
if self.end_time <= time.time():
await self.hook.a_cancel_sql_statement(self.statement_id)
yield TriggerEvent({...})
return|
|
||
| from airflow.providers.common.compat.openlineage.check import require_openlineage_version | ||
| from airflow.utils import timezone | ||
| from airflow.utils import timezone # type: ignore[attr-defined] |
There was a problem hiding this comment.
This # type: ignore[attr-defined] is unrelated to the async-to-sync fix. Suppressing mypy errors on imports can mask real breakage if this module moves. Should probably be in a separate commit/PR.
| schema = self.databricks_conn.schema or "https" | ||
| return f"{schema}://{self.host}{port}/{endpoint}" | ||
|
|
||
| async def _a_endpoint_url(self, endpoint): |
There was a problem hiding this comment.
_a_endpoint_url is missing a return type annotation. The sync _endpoint_url also lacks one, but new async code should set the bar higher. -> str would match.
|
@techcodie This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
There was a problem hiding this comment.
Pull request overview
Fixes an Airflow 3 triggerer crash for deferrable Databricks operators by ensuring async trigger/hook code paths don’t hit the synchronous connection retrieval logic while running inside the triggerer event loop.
Changes:
- Add async-safe Databricks connection/host retrieval and use it in async hook paths to avoid sync
get_connection()during trigger execution. - Add async cancel APIs to
DatabricksHookand update the SQL statement trigger to use async cancellation on timeout. - Update/add unit tests to cover the new async cancellation behavior and trigger timeout behavior; adjust a typing-related timezone import.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py | Introduces async connection/host access and refactors async token/auth flows to avoid sync connection access in event-loop contexts. |
| providers/databricks/src/airflow/providers/databricks/hooks/databricks.py | Adds async cancellation methods used by triggers (a_cancel_run, a_cancel_sql_statement). |
| providers/databricks/src/airflow/providers/databricks/triggers/databricks.py | Updates SQL statement trigger to initialize state and await async cancellation on timeout. |
| providers/databricks/src/airflow/providers/databricks/utils/openlineage.py | Adds a typing ignore to the timezone import (flagged for convention consistency). |
| providers/databricks/tests/unit/databricks/hooks/test_databricks.py | Updates sync cancel expectations and adds async cancel tests using aiohttp patching. |
| providers/databricks/tests/unit/databricks/triggers/test_databricks.py | Adds a timeout test verifying a timeout event payload and that async cancellation is invoked. |
Comments suppressed due to low confidence (1)
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:531
_a_get_aad_token_for_default_az_credential()instantiatesAsyncDefaultAzureCredential()inline and never closes it. Other async credential usages in this hook useasync with ... as credentialto ensure transports are closed. Consider usingasync with AsyncDefaultAzureCredential() as credential(or explicitly callingawait credential.close()in afinally) to avoid leaking network resources across retries.
from azure.identity.aio import (
DefaultAzureCredential as AsyncDefaultAzureCredential,
)
async for attempt in self._a_get_retry_object():
with attempt:
# This only works in an Azure Kubernetes Service Cluster given the following environment variables:
# AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_FEDERATED_TOKEN_FILE
#
# While there is a WorkloadIdentityCredential class, the below class is advised by Microsoft
# https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview
token = await AsyncDefaultAzureCredential().get_token(f"{resource}/.default")
| aad_headers = await self._a_get_aad_headers() | ||
| headers = {**self.user_agent_header, **aad_headers} | ||
|
|
There was a problem hiding this comment.
headers is already computed as {**self.user_agent_header, **aad_headers}; later, the request passes headers={**headers, **self.user_agent_header}, re-merging the user-agent header and making precedence harder to reason about. Consider passing headers=headers (or explicitly document why user-agent should override AAD headers).
| # We access login and password directly from the connection object to avoid | ||
| # calling self.databricks_conn which would trigger sync get_connection(). | ||
| # Behavior matches sync _get_connection_attr() but without the cache-clash. |
There was a problem hiding this comment.
The comment says this avoids a “cache-clash”, but the main concern here is avoiding the sync get_connection() path inside the triggerer event loop. Consider updating the wording to reflect the actual risk (AsyncToSync in a running event loop) and/or reference that a_databricks_conn() primes the cached_property.
| # We access login and password directly from the connection object to avoid | |
| # calling self.databricks_conn which would trigger sync get_connection(). | |
| # Behavior matches sync _get_connection_attr() but without the cache-clash. | |
| # We access login and password directly from the async connection object to avoid | |
| # going through self.databricks_conn, which would invoke the sync get_connection() | |
| # (AsyncToSync) path inside the triggerer event loop. a_databricks_conn() also primes | |
| # the databricks_conn cached_property so later access does not re-enter get_connection(). |
| aad_headers = await self._a_get_aad_headers() | ||
| headers = {**self.user_agent_header, **aad_headers} | ||
|
|
There was a problem hiding this comment.
headers is built as {**self.user_agent_header, **aad_headers} but later the request call re-merges self.user_agent_header again. Consider making the header precedence explicit in one place (either keep the merge here and pass headers=headers, or build headers in the final form once) to avoid redundant dict copies and reduce ambiguity about which values win.
| # We access login and password directly from the connection object to avoid | ||
| # calling self.databricks_conn which would trigger sync get_connection(). | ||
| # Behavior matches sync _get_connection_attr() but without the cache-clash. |
There was a problem hiding this comment.
The inline comment mentions a “cache-clash”, but the actual issue being avoided is triggering the synchronous get_connection() path (and AsyncToSync failures) inside the triggerer event loop. Consider rewording this comment to reflect that motivation, and optionally reuse _get_connection_attr("login", conn) here for consistency with other async auth paths.
| # We access login and password directly from the connection object to avoid | |
| # calling self.databricks_conn which would trigger sync get_connection(). | |
| # Behavior matches sync _get_connection_attr() but without the cache-clash. | |
| # Access login and password directly on the async connection object to avoid | |
| # calling the synchronous get_connection() path (and AsyncToSync failures) | |
| # inside the triggerer event loop. This mirrors sync _get_connection_attr() | |
| # behavior without invoking the sync hook property. |
|
|
||
| from airflow.providers.common.compat.openlineage.check import require_openlineage_version | ||
| from airflow.utils import timezone | ||
| from airflow.utils import timezone # type: ignore[attr-defined] |
There was a problem hiding this comment.
from airflow.utils import timezone # type: ignore[attr-defined] relies on Airflow’s runtime deprecation redirect and silences typing. Elsewhere in the repo the cross-version pattern is to import timezone via airflow.providers.common.compat.sdk (or try: from airflow.sdk import timezone). Consider switching to the compat import to avoid the type ignore and keep a consistent Airflow 2/3 import style.
| from airflow.utils import timezone # type: ignore[attr-defined] | |
| from airflow.providers.common.compat.sdk import timezone |
Summary
When using any Databricks operator with
deferrable=Truein Airflow 3, the trigger fails immediately with:Root Cause
The
databricks_conn@cached_propertylazily fetches the connection on first access via the syncget_connection()path. In Airflow 3, when a trigger runs inside the triggerer's async event loop, that sync path goes throughSUPERVISOR_COMMS.send()which callsasync_to_sync(self.asend)(msg)— forbidden inside a running event loop.Full call chain:
Fix
Added
_a_get_databricks_conn()— an async method that uses the already-existing_async_get_connection()fromairflow.sdk.execution_time.context, which usesawait comms_decoder.asend()directly and is safe inside an event loop._a_do_api_call()now callsawait self._a_get_databricks_conn()at the top, populatingself.__dict__["databricks_conn"](satisfying the@cached_propertycache) before any sync attribute access occurs.No behaviour change for non-async (sync) code paths.
Changes
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py_a_get_databricks_conn()async method; added oneawaitcall at the top of_a_do_api_call()How to reproduce
Was generative AI tooling used to co-author this PR?