diff --git a/src/amcrest/http.py b/src/amcrest/http.py index 4bc7be5..7d5662a 100644 --- a/src/amcrest/http.py +++ b/src/amcrest/http.py @@ -9,9 +9,12 @@ # # vim:sw=4:ts=4:et import asyncio +import contextlib +from functools import lru_cache import logging import re import socket +import ssl import threading from contextlib import asynccontextmanager from typing import AsyncIterator, Optional, Tuple, Union @@ -46,6 +49,7 @@ except AttributeError: pass + TimeoutT = Union[Optional[float], Tuple[Optional[float], Optional[float]]] HttpxTimeoutT = Union[ Optional[float], @@ -53,6 +57,34 @@ ] +@lru_cache(maxsize=None) +def create_default_ssl_context() -> ssl.SSLContext: + """Return an SSL context that verifies the server certificate.""" + return ssl.create_default_context() + + +@lru_cache(maxsize=None) +def create_no_verify_ssl_context() -> ssl.SSLContext: + """Return an SSL context that does not verify the server certificate. + This is a copy of aiohttp's create_default_context() function, with the + ssl verify turned off and old SSL versions enabled. + + https://github.com/aio-libs/aiohttp/blob/33953f110e97eecc707e1402daa8d543f38a189b/aiohttp/connector.py#L911 + """ + sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + sslcontext.check_hostname = False + sslcontext.verify_mode = ssl.CERT_NONE + # Allow all ciphers rather than only Python 3.10 default + sslcontext.set_ciphers("DEFAULT") + with contextlib.suppress(AttributeError): + # This only works for OpenSSL >= 1.0.0 + sslcontext.options |= ssl.OP_NO_COMPRESSION + sslcontext.set_default_verify_paths() + # ssl.OP_LEGACY_SERVER_CONNECT is only available in Python 3.12a4+ + sslcontext.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4) + return sslcontext + + class SOHTTPAdapter(HTTPAdapter): """HTTPAdapter with support for socket options.""" @@ -331,10 +363,15 @@ async def _async_command( else: httpx_timeout = timeout + if self._verify: + ssl_context = create_default_ssl_context() + else: + ssl_context = create_no_verify_ssl_context() + async with httpx.AsyncClient( follow_redirects=True, auth=self._async_token, - verify=self._verify, + verify=ssl_context, timeout=httpx_timeout, ) as client: for loop in range(1, 2 + retries):