Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,15 @@ workers:
type: float
example: ~
default: "90.0"
execution_api_timeout:
description: |
The timeout (in seconds) for HTTP requests from workers to the Execution API server.
This controls how long a worker will wait for a response from the API server before
timing out. Increase this value if you experience timeout errors under high load.
version_added: 3.1.1
type: float
example: ~
default: "5.0"
Comment thread
kaxil marked this conversation as resolved.
socket_cleanup_timeout:
description: |
Number of seconds to wait after a task process exits before forcibly closing any
Expand Down
5 changes: 5 additions & 0 deletions task-sdk/src/airflow/sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
API_RETRY_WAIT_MIN = conf.getfloat("workers", "execution_api_retry_wait_min")
API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max")
API_SSL_CERT_PATH = conf.get("api", "ssl_cert")
API_TIMEOUT = conf.getfloat("workers", "execution_api_timeout")


class Client(httpx.Client):
Expand All @@ -848,6 +849,10 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, *
if API_SSL_CERT_PATH:
ctx.load_verify_locations(API_SSL_CERT_PATH)
kwargs["verify"] = ctx

# Set timeout if not explicitly provided
kwargs.setdefault("timeout", API_TIMEOUT)

pyver = f"{'.'.join(map(str, sys.version_info[:3]))}"
super().__init__(
auth=auth,
Expand Down
19 changes: 18 additions & 1 deletion task-sdk/tests/task_sdk/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from uuid6 import uuid7

from airflow.sdk import timezone
from airflow.sdk.api.client import RemoteValidationError, ServerResponseError
from airflow.sdk.api.client import Client, RemoteValidationError, ServerResponseError
from airflow.sdk.api.datamodels._generated import (
AssetEventsResponse,
AssetResponse,
Expand Down Expand Up @@ -99,6 +99,23 @@ def handle_request(request: httpx.Request) -> httpx.Response:

assert isinstance(err.value, FileNotFoundError)

@mock.patch("airflow.sdk.api.client.API_TIMEOUT", 60.0)
def test_timeout_configuration(self):
def handle_request(request: httpx.Request) -> httpx.Response:
return httpx.Response(status_code=200)

client = make_client(httpx.MockTransport(handle_request))
assert client.timeout == httpx.Timeout(60.0)

def test_timeout_can_be_overridden(self):
def handle_request(request: httpx.Request) -> httpx.Response:
return httpx.Response(status_code=200)

client = Client(
base_url="test://server", token="", transport=httpx.MockTransport(handle_request), timeout=120.0
)
assert client.timeout == httpx.Timeout(120.0)

def test_error_parsing(self):
responses = [
httpx.Response(422, json={"detail": [{"loc": ["#0"], "msg": "err", "type": "required"}]})
Expand Down
Loading