feat: implement patching of task group instances in API#62812
feat: implement patching of task group instances in API#62812OscarLigthart wants to merge 6 commits intoapache:mainfrom
Conversation
c03d03f to
60004da
Compare
There was a problem hiding this comment.
Thanks for the PR.
Can you add query guards to tests (assert_queries_count) . I'm afraid this will generate N+1 queries problem -> Number of db request will scale linearly with the number of TIs in the group. And we should rework the code not do do that.
Also I find the code overly complicated with numerous duplicated call made at different abstraction level because of function nesting, it makes the whole thing hard to understand and probably sub optimal.
Thanks for the detailed review! I think the points made a lot of sense. Apologies for this oversight on my side. I tried following the implementation of the other PR more, removing the BulkTaskInstanceService usage and solving the N+1 query problem. I still opted to keep them in separate endpoints for clarity. Let me know if you disagree! Also happy to cast this aside and wait for the other PR to be picked up again, if that is more in line with your preference. One question regarding the EDIT - I'm now asserting whether the query count is equal between different size task groups. Should solve the above 👍 |
767a96c to
85e4f72
Compare
|
@OscarLigthart This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
85e4f72 to
0bfc970
Compare
cabb86e to
88add8c
Compare
|
This pull request has been converted to draft due to quality issues more than a week ago and there has been no response from the author since then. We are closing it for now to keep the review queue manageable. @OscarLigthart, you are welcome to reopen this PR after addressing the review comments. Thank you for your contribution! |
There was a problem hiding this comment.
A few nits for test only. (remove warmups call, and assert the response payload too)
PR looks good and I'd love to get that in.
Re-opening as I believe @OscarLigthart was waiting for a final review. (we're getting there) We just need a rebase and address the few nits.
66408e9 to
cce00eb
Compare
3643306 to
bcf31b6
Compare
|
Thanks for reopening and reviewing @pierrejeambrun ! I've addressed most of your comments. I got stuck again on the N+1 query problem after removing the mock. I've consulted with Claude and left a trace of the resulting conversation (can't believe I'm already calling this "conversations" with my agent, what a time to be alive). Let me know how you would like to proceed! |
bcf31b6 to
9692db9
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Implements API support for patching all task instances within a task group (including dry-run) to enable marking entire task groups as success/failed in Airflow 3.
Changes:
- Added
PATCH+PATCH /dry_runendpoints for task group instance state updates and wired them into OpenAPI + UI client generation. - Introduced a batched
SerializedDAG.set_multiple_task_instances_state()to reduce N+1 queries when updating many tasks. - Added unit tests for task-group patching behavior and query-count scaling.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| airflow-ctl/src/airflowctl/api/datamodels/generated.py | Updates request body description for patch semantics. |
| airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py | Updates docstring for PatchTaskInstanceBody. |
| airflow-core/src/airflow/serialization/definitions/dag.py | Adds batched set_multiple_task_instances_state() implementation. |
| airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py | Adds helpers for validating/collecting task-group TIs and patching state in bulk. |
| airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py | Adds new task-group patch + dry-run endpoints and response serialization logic. |
| airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | Registers the new endpoints + schema description updates. |
| airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts | Adds generated request/response types for the new endpoints. |
| airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts | Adds generated client methods for task-group patch endpoints. |
| airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts | Updates schema description for PatchTaskInstanceBody. |
| airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts | Adds generated React Query mutations for task-group patch endpoints. |
| airflow-core/src/airflow/ui/openapi-gen/queries/common.ts | Adds mutation result aliases for newly generated methods. |
| airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py | Adds tests for task-group patching + dry-run + query count scaling. |
|
@OscarLigthart I replied to your main concern there #62812 (comment) just in case you missed it |
Hey @pierrejeambrun, sorry for the radio silence on my side, had a few crazy days. I will find some time over the weekend to look through your suggestions and apply the necessary changes 👍 |
9692db9 to
f399206
Compare
|
Hi @pierrejeambrun, I had to make some tweaks to make the As an alternative I have the previous to last commit, which has the original implementation with your initial feedback. This solution would cause a steeper query count growth with scale, but doesn't add the logic in the DAG serialization just yet. Let me know how you would like to proceed! |
pierrejeambrun
left a comment
There was a problem hiding this comment.
Good progress on adopting the 2.x pattern with the batched set_state and lock_rows. A few things below, mostly around tests and cleanup.
| except ValidationError as e: | ||
| raise RequestValidationError(errors=e.errors()) | ||
|
|
||
| return dag, tis, body.model_dump(include=fields_to_update, by_alias=True) |
There was a problem hiding this comment.
This function is basically a copy of _patch_ti_validate_request with task_id/map_index swapped for task_group_id and a different fetch helper. Can we factor out the common part (fields_to_update + validation at the bottom)?
There was a problem hiding this comment.
I took it out of both to prevent duplicate logic, but I'm not entirely sure if that's what you meant. Let me know 👍
| - Task Instance | ||
| summary: Patch Task Group Instances | ||
| description: Update the state of all task instances in a task group. | ||
| operationId: patch_task_group_instances |
There was a problem hiding this comment.
Can you double-check the generated spec for both endpoints? Especially that the 409 response is registered for the main patch endpoint (since we raise it when all TIs are already in the target state).
There was a problem hiding this comment.
Should be good, the 409 is only registered for the non dry-run, as there are no changes committed in the dry run.
9273369 to
1b8ba01
Compare
1cfe416 to
326b655
Compare
|
@OscarLigthart This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Putting back to ready for review as I'm waiting for maintainer comments in unresolved threads. |
Context
In Airflow 3 the ability to mark a full task group as failed or success is currently missing. In this PR, I try to implement the logic into the API, that can then be called from the frontend to ensure the functionality returns.
I deliberately split the feature in two separate PRs, so the reviewing process can be more targeted to the individually touched components. Should this PR get merged, I will continue to build the remaining requirements into the frontend.
There is an already open PR here: #60161
However, it looks to be stale, and I would love to get this feature into the Airflow 3.2 release.
Implementation
I make use of the BulkTaskInstanceService to perform the state updates with this endpoint. Using it, the implementation should be pretty straightforward :).
Issues
related: #56103
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus following the guidelines
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.