def ssl_context_from_node_config()

in elastic_transport/_node/_base.py [0:0]


def ssl_context_from_node_config(node_config: NodeConfig) -> ssl.SSLContext:
    if node_config.ssl_context:
        ctx = node_config.ssl_context
    else:
        ctx = ssl.create_default_context()

        # Enable/disable certificate verification in these orders
        # to avoid 'ValueErrors' from SSLContext. We only do this
        # step if the user doesn't pass a preconfigured SSLContext.
        if node_config.verify_certs:
            ctx.verify_mode = ssl.CERT_REQUIRED
            ctx.check_hostname = not is_ipaddress(node_config.host)
        else:
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE

    # Enable logging of TLS session keys for use with Wireshark.
    if hasattr(ctx, "keylog_filename"):
        sslkeylogfile = os.environ.get("SSLKEYLOGFILE", "")
        if sslkeylogfile:
            ctx.keylog_filename = sslkeylogfile

    # Apply the 'ssl_version' if given, otherwise default to TLSv1.2+
    ssl_version = node_config.ssl_version
    if ssl_version is None:
        if _HAS_TLS_VERSION:
            ssl_version = ssl.TLSVersion.TLSv1_2
        else:
            ssl_version = ssl.PROTOCOL_TLSv1_2

    try:
        if _HAS_TLS_VERSION:
            ctx.minimum_version = _SSL_PROTOCOL_VERSION_TO_TLS_VERSION[ssl_version]
        else:
            ctx.options |= _SSL_PROTOCOL_VERSION_TO_OPTIONS[ssl_version]
    except KeyError:
        raise ValueError(
            f"Unsupported value for 'ssl_version': {ssl_version!r}. Must be "
            "either 'ssl.PROTOCOL_TLSvX' or 'ssl.TLSVersion.TLSvX'"
        ) from None

    return ctx