in oss2/credentials.py [0:0]
def fetch(self, retry_times=3, timeout=10):
for i in range(0, retry_times):
try:
response = requests.get(self.auth_host, timeout=timeout)
if response.status_code != 200:
raise ClientError(
"Failed to fetch credentials url, http code:{0}, msg:{1}".format(response.status_code,
response.text))
dic = json.loads(to_unicode(response.content))
code = dic.get('Code')
access_key_id = dic.get('AccessKeyId')
access_key_secret = dic.get('AccessKeySecret')
security_token = dic.get('SecurityToken')
expiration_date = dic.get('Expiration')
last_updated_date = dic.get('LastUpdated')
if code != "Success":
raise ClientError("Get credentials from ECS metadata service error, code: {0}".format(code))
expiration_stamp = to_unixtime(expiration_date, "%Y-%m-%dT%H:%M:%SZ")
duration = DEFAULT_ECS_SESSION_TOKEN_DURATION_SECONDS
if last_updated_date is not None:
last_updated_stamp = to_unixtime(last_updated_date, "%Y-%m-%dT%H:%M:%SZ")
duration = expiration_stamp - last_updated_stamp
return EcsRamRoleCredential(access_key_id, access_key_secret, security_token, expiration_stamp,
duration, DEFAULT_ECS_SESSION_EXPIRED_FACTOR)
except Exception as e:
if i == retry_times - 1:
logger.error("Exception: {0}".format(e))
raise ClientError("Failed to get credentials from ECS metadata service. {0}".format(e))