Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions connect/client/fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
#
import contextvars
import threading
from functools import cache
from json.decoder import JSONDecodeError
from typing import Union

import httpx
import requests
from httpx._config import Proxy
from httpx._utils import get_environment_proxies
from requests.adapters import HTTPAdapter

from connect.client.constants import CONNECT_ENDPOINT_URL, CONNECT_SPECS_URL
Expand Down Expand Up @@ -237,6 +240,20 @@ def _get_namespace_class(self):
_SSL_CONTEXT = httpx.create_ssl_context()


@cache
def _get_async_mounts():
"""
This code based on how httpx.Client mounts proxies from environment.
This is cached to allow reusing the created transport objects.
"""
return {
key: None
if url is None
else httpx.AsyncHTTPTransport(verify=_SSL_CONTEXT, proxy=Proxy(url=url))
for key, url in get_environment_proxies().items()
}


class AsyncConnectClient(_ConnectClientBase, AsyncClientMixin):
"""
Create a new instance of the AsyncConnectClient.
Expand Down Expand Up @@ -274,12 +291,14 @@ def __init__(self, *args, **kwargs):
def session(self):
value = self._session.get()
if not value:
value = httpx.AsyncClient(
transport=_ASYNC_TRANSPORTS.setdefault(
self.endpoint,
httpx.AsyncHTTPTransport(verify=_SSL_CONTEXT),
),
)
transport = _ASYNC_TRANSPORTS.get(self.endpoint)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also fixed how transports are reused, because a new transport object was created every time even if the existing one for the endpoint was used.

if not transport:
transport = _ASYNC_TRANSPORTS[self.endpoint] = httpx.AsyncHTTPTransport(
verify=_SSL_CONTEXT,
)
# When passing a transport to httpx a Client/AsyncClient, proxies defined in environment
# (like HTTP_PROXY) are ignored, so let's pass them using mounts parameter.
value = httpx.AsyncClient(transport=transport, mounts=_get_async_mounts())
self._session.set(value)
return value

Expand Down