Skip to content

Add endpoint to watch dag run until finish#51920

Merged
uranusjr merged 5 commits intoapache:mainfrom
astronomer:stream-until-run-complete
Jul 9, 2025
Merged

Add endpoint to watch dag run until finish#51920
uranusjr merged 5 commits intoapache:mainfrom
astronomer:stream-until-run-complete

Conversation

@uranusjr
Copy link
Copy Markdown
Member

@uranusjr uranusjr commented Jun 19, 2025

Close #51711.

I initially wanted to just enhance the trigger endpoint to optionally stream until the run finishes, but it seems that FastAPI does not like this optionally stream idea. You can do it of course, but would loose a lot of the auto annotation reflection feature. So I opted to have a separate streaming endpoint instead.

This endpoint repeatedly emits a JSON object at the specified interval, until the dag reaches a finished state.

Tests to come.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. labels Jun 19, 2025
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Jun 19, 2025

FYI. Not sure if it helps but OpenAPI 3.2.0 will have support for application/json-seq OAI/OpenAPI-Specification#3730 (planned for August)

@uranusjr uranusjr added this to the Airflow 3.1.0 milestone Jun 24, 2025
@uranusjr uranusjr force-pushed the stream-until-run-complete branch 2 times, most recently from 231597b to 007a051 Compare June 25, 2025 04:53
@uranusjr
Copy link
Copy Markdown
Member Author

I added some tests for the endpoint, but couldn’t figure out how to test the looping part. Hopefully this is good enough…

Copy link
Copy Markdown
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, just a few suggestions / nits.

Indeed an additional test case for running / success state would be great.

Comment thread airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py Outdated
@uranusjr uranusjr force-pushed the stream-until-run-complete branch from 007a051 to 152bd6d Compare July 3, 2025 09:31
Copy link
Copy Markdown
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

Just a few nits, but nothing blocking

Comment thread airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py Outdated
Comment thread airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py Outdated
@uranusjr uranusjr force-pushed the stream-until-run-complete branch from 152bd6d to 34badc1 Compare July 7, 2025 07:04
@uranusjr uranusjr force-pushed the stream-until-run-complete branch 2 times, most recently from 96812e6 to 422cd94 Compare July 7, 2025 19:27
Comment thread airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py Outdated
Copy link
Copy Markdown
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some simple docs would be good. Fine as a follow-up too as long as we have a GH issue to not forget.

@vikramkoka could review that too

@uranusjr uranusjr force-pushed the stream-until-run-complete branch from 422cd94 to 58d7fa5 Compare July 9, 2025 05:40
@uranusjr
Copy link
Copy Markdown
Member Author

uranusjr commented Jul 9, 2025

Tracking doc addition #53067

@uranusjr uranusjr merged commit 5fbbf64 into apache:main Jul 9, 2025
102 checks passed
@uranusjr uranusjr deleted the stream-until-run-complete branch July 9, 2025 06:34
HsiuChuanHsu pushed a commit to HsiuChuanHsu/airflow that referenced this pull request Jul 10, 2025
stephen-bracken pushed a commit to stephen-bracken/airflow that referenced this pull request Jul 15, 2025
kaxil added a commit to apache/airflow-client-python that referenced this pull request Oct 22, 2025
(from https://github.com/apache/airflow/tree/python-client/3.1.0rc1)

## New Features:

- Add `map_index` filter to TaskInstance API queries ([#55614](apache/airflow#55614))
- Add `has_import_errors` filter to Core API GET /dags endpoint ([#54563](apache/airflow#54563))
- Add `dag_version` filter to get_dag_runs endpoint ([#54882](apache/airflow#54882))
- Implement pattern search for event log endpoint ([#55114](apache/airflow#55114))
- Add asset-based filtering support to DAG API endpoint ([#54263](apache/airflow#54263))
- Add Greater Than and Less Than range filters to DagRuns and Task Instance list ([#54302](apache/airflow#54302))
- Add `try_number` as filter to task instances ([#54695](apache/airflow#54695))
- Add filters to Browse XComs endpoint ([#54049](apache/airflow#54049))
- Add Filtering by DAG Bundle Name and Version to API routes ([#54004](apache/airflow#54004))
- Add search filter for DAG runs by triggering user name ([#53652](apache/airflow#53652))
- Enable multi sorting (AIP-84) ([#53408](apache/airflow#53408))
- Add `run_on_latest_version` support for backfill and clear operations ([#52177](apache/airflow#52177))
- Add `run_id_pattern` search for Dag Run API ([#52437](apache/airflow#52437))
- Add tracking of triggering user to Dag runs ([#51738](apache/airflow#51738))
- Expose DAG parsing duration in the API ([#54752](apache/airflow#54752))

## New API Endpoints:

- Add Human-in-the-Loop (HITL) endpoints for approval workflows ([#52868](apache/airflow#52868), [#53373](apache/airflow#53373), [#53376](apache/airflow#53376), [#53885](apache/airflow#53885), [#53923](apache/airflow#53923), [#54308](apache/airflow#54308), [#54310](apache/airflow#54310), [#54723](apache/airflow#54723), [#54773](apache/airflow#54773), [#55019](apache/airflow#55019), [#55463](apache/airflow#55463), [#55525](apache/airflow#55525), [#55535](apache/airflow#55535), [#55603](apache/airflow#55603), [#55776](apache/airflow#55776))
- Add endpoint to watch dag run until finish ([#51920](apache/airflow#51920))
- Add TI bulk actions endpoint ([#50443](apache/airflow#50443))
- Add Keycloak Refresh Token Endpoint ([#51657](apache/airflow#51657))

## Deprecations:

- Mark `DagDetailsResponse.concurrency` as deprecated ([#55150](apache/airflow#55150))

## Bug Fixes:

- Fix dag import error modal pagination ([#55719](apache/airflow#55719))
kaxil added a commit to apache/airflow-client-python that referenced this pull request Oct 23, 2025
(from https://github.com/apache/airflow/tree/python-client/3.1.0rc1)

## New Features:

- Add `map_index` filter to TaskInstance API queries ([#55614](apache/airflow#55614))
- Add `has_import_errors` filter to Core API GET /dags endpoint ([#54563](apache/airflow#54563))
- Add `dag_version` filter to get_dag_runs endpoint ([#54882](apache/airflow#54882))
- Implement pattern search for event log endpoint ([#55114](apache/airflow#55114))
- Add asset-based filtering support to DAG API endpoint ([#54263](apache/airflow#54263))
- Add Greater Than and Less Than range filters to DagRuns and Task Instance list ([#54302](apache/airflow#54302))
- Add `try_number` as filter to task instances ([#54695](apache/airflow#54695))
- Add filters to Browse XComs endpoint ([#54049](apache/airflow#54049))
- Add Filtering by DAG Bundle Name and Version to API routes ([#54004](apache/airflow#54004))
- Add search filter for DAG runs by triggering user name ([#53652](apache/airflow#53652))
- Enable multi sorting (AIP-84) ([#53408](apache/airflow#53408))
- Add `run_on_latest_version` support for backfill and clear operations ([#52177](apache/airflow#52177))
- Add `run_id_pattern` search for Dag Run API ([#52437](apache/airflow#52437))
- Add tracking of triggering user to Dag runs ([#51738](apache/airflow#51738))
- Expose DAG parsing duration in the API ([#54752](apache/airflow#54752))

## New API Endpoints:

- Add Human-in-the-Loop (HITL) endpoints for approval workflows ([#52868](apache/airflow#52868), [#53373](apache/airflow#53373), [#53376](apache/airflow#53376), [#53885](apache/airflow#53885), [#53923](apache/airflow#53923), [#54308](apache/airflow#54308), [#54310](apache/airflow#54310), [#54723](apache/airflow#54723), [#54773](apache/airflow#54773), [#55019](apache/airflow#55019), [#55463](apache/airflow#55463), [#55525](apache/airflow#55525), [#55535](apache/airflow#55535), [#55603](apache/airflow#55603), [#55776](apache/airflow#55776))
- Add endpoint to watch dag run until finish ([#51920](apache/airflow#51920))
- Add TI bulk actions endpoint ([#50443](apache/airflow#50443))
- Add Keycloak Refresh Token Endpoint ([#51657](apache/airflow#51657))

## Deprecations:

- Mark `DagDetailsResponse.concurrency` as deprecated ([#55150](apache/airflow#55150))

## Bug Fixes:

- Fix dag import error modal pagination ([#55719](apache/airflow#55719))
kaxil added a commit to astronomer/airflow that referenced this pull request Apr 15, 2026
When _configure_async_session() was extracted from configure_orm() in
PR apache#51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.
kaxil added a commit to astronomer/airflow that referenced this pull request Apr 15, 2026
When _configure_async_session() was extracted from configure_orm() in
PR apache#51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.
kaxil added a commit to astronomer/airflow that referenced this pull request Apr 15, 2026
When _configure_async_session() was extracted from configure_orm() in
PR apache#51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)
vatsrahul1001 added a commit that referenced this pull request Apr 15, 2026
…5284)

When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
vatsrahul1001 added a commit that referenced this pull request Apr 15, 2026
…5284)

When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
vatsrahul1001 added a commit that referenced this pull request Apr 15, 2026
…5284)

When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
vatsrahul1001 added a commit that referenced this pull request Apr 15, 2026
…5284)

When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
vatsrahul1001 added a commit that referenced this pull request Apr 15, 2026
…5284)

When _configure_async_session() was extracted from configure_orm() in
PR #51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.

(cherry picked from commit 69a88bf)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Apr 16, 2026
When _configure_async_session() was extracted from configure_orm() in
PR apache#51920, the corresponding cleanup in dispose_orm() was not updated.
This left async_engine connections abandoned on process exit, gunicorn
worker restarts, and atexit -- gradually exhausting PostgreSQL's
max_connections.

Dispose async_engine.sync_engine (the synchronous path, matching
the existing clean_in_fork pattern) and clear both async_engine and
AsyncSession references.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable Inference Execution / Synchronous DAG Execution

6 participants