src/mount_efs/__init__.py [1091:1172]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def is_instance_metadata_url(url):
    return url.startswith("http://169.254.169.254")


def url_request_helper(config, url, unsuccessful_resp, url_error_msg, headers={}):
    try:
        req = Request(url)
        for k, v in headers.items():
            req.add_header(k, v)

        if not fetch_ec2_metadata_token_disabled(config) and is_instance_metadata_url(
            url
        ):
            # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
            # IMDSv1 is a request/response method to access instance metadata
            # IMDSv2 is a session-oriented method to access instance metadata
            # We expect the token retrieve will fail in bridge networking environment (e.g. container) since the default hop
            # limit for getting the token is 1. If the token retrieve does timeout, we fallback to use IMDSv1 instead
            token = get_aws_ec2_metadata_token()
            if token:
                req.add_header("X-aws-ec2-metadata-token", token)

        request_resp = urlopen(req, timeout=1)

        return get_resp_obj(request_resp, url, unsuccessful_resp)
    except socket.timeout:
        err_msg = "Request timeout"
    except HTTPError as e:
        # For instance enable with IMDSv2 and fetch token disabled, Unauthorized 401 error will be thrown
        if (
            e.code == 401
            and fetch_ec2_metadata_token_disabled(config)
            and is_instance_metadata_url(url)
        ):
            logging.warning(
                "Unauthorized request to instance metadata url %s, IMDSv2 is enabled on the instance, while fetching "
                "ec2 metadata token is disabled. Please set the value of config item "
                '"%s" to "false" in config file %s.'
                % (url, DISABLE_FETCH_EC2_METADATA_TOKEN_ITEM, CONFIG_FILE)
            )
        err_msg = "Unable to reach the url at %s: status=%d, reason is %s" % (
            url,
            e.code,
            e.reason,
        )
    except URLError as e:
        err_msg = "Unable to reach the url at %s, reason is %s" % (url, e.reason)

    if err_msg:
        logging.debug("%s %s", url_error_msg, err_msg)
    return None


def get_resp_obj(request_resp, url, unsuccessful_resp):
    """
    Parse the response of an url request

    :return: If the response result can be parsed into json object, return the json object parsed from the response.
             Otherwise return the response body in string format.
    """

    if request_resp.getcode() != 200:
        logging.debug(
            unsuccessful_resp + " %s: ResponseCode=%d", url, request_resp.getcode()
        )
        return None

    resp_body = request_resp.read()
    resp_body_type = type(resp_body)
    try:
        if resp_body_type is str:
            resp_dict = json.loads(resp_body)
        else:
            resp_dict = json.loads(
                resp_body.decode(
                    request_resp.headers.get_content_charset() or "us-ascii"
                )
            )

        return resp_dict
    except ValueError:
        return resp_body if resp_body_type is str else resp_body.decode("utf-8")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/watchdog/__init__.py [577:651]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def is_instance_metadata_url(url):
    return url.startswith("http://169.254.169.254")


def url_request_helper(config, url, unsuccessful_resp, url_error_msg, headers={}):
    try:
        req = Request(url)
        for k, v in headers.items():
            req.add_header(k, v)

        if not fetch_ec2_metadata_token_disabled(config) and is_instance_metadata_url(
            url
        ):
            # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
            # IMDSv1 is a request/response method to access instance metadata
            # IMDSv2 is a session-oriented method to access instance metadata
            # We expect the token retrieve will fail in bridge networking environment (e.g. container) since the default hop
            # limit for getting the token is 1. If the token retrieve does timeout, we fallback to use IMDSv1 instead
            token = get_aws_ec2_metadata_token()
            if token:
                req.add_header("X-aws-ec2-metadata-token", token)

        request_resp = urlopen(req, timeout=1)

        return get_resp_obj(request_resp, url, unsuccessful_resp)
    except socket.timeout:
        err_msg = "Request timeout"
    except HTTPError as e:
        # For instance enable with IMDSv2 and fetch token disabled, Unauthorized 401 error will be thrown
        if (
            e.code == 401
            and fetch_ec2_metadata_token_disabled(config)
            and is_instance_metadata_url(url)
        ):
            logging.warning(
                "Unauthorized request to instance metadata url %s, IMDSv2 is enabled on the instance, while fetching "
                "ec2 metadata token is disabled. Please set the value of config item "
                '"%s" to "false" in config file %s.'
                % (url, DISABLE_FETCH_EC2_METADATA_TOKEN_ITEM, CONFIG_FILE)
            )
        err_msg = "Unable to reach the url at %s: status=%d, reason is %s" % (
            url,
            e.code,
            e.reason,
        )
    except URLError as e:
        err_msg = "Unable to reach the url at %s, reason is %s" % (url, e.reason)

    if err_msg:
        logging.debug("%s %s", url_error_msg, err_msg)
    return None


def get_resp_obj(request_resp, url, unsuccessful_resp):
    if request_resp.getcode() != 200:
        logging.debug(
            unsuccessful_resp + " %s: ResponseCode=%d", url, request_resp.getcode()
        )
        return None

    resp_body = request_resp.read()
    resp_body_type = type(resp_body)
    try:
        if resp_body_type is str:
            resp_dict = json.loads(resp_body)
        else:
            resp_dict = json.loads(
                resp_body.decode(
                    request_resp.headers.get_content_charset() or "us-ascii"
                )
            )

        return resp_dict
    except ValueError:
        return resp_body if resp_body_type is str else resp_body.decode("utf-8")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



