in alibabacloud_credentials/providers.py [0:0]
def _create_credentials(self):
# 获取credential 先实现签名用工具类
oidc_token = au.get_private_key(self.oidc_token_file_path)
tea_request = ph.get_new_request()
tea_request.query = {
'Action': 'AssumeRoleWithOIDC',
'Format': 'JSON',
'Version': '2015-04-01',
'DurationSeconds': str(self.duration_seconds),
'RoleArn': self.role_arn,
'OIDCProviderArn': self.oidc_provider_arn,
'OIDCToken': oidc_token,
'RoleSessionName': self.role_session_name or 'defaultSessionName'
}
tea_request.query["Timestamp"] = ph.get_iso_8061_date()
tea_request.query["SignatureNonce"] = ph.get_uuid()
if self.policy is not None:
tea_request.query["Policy"] = self.policy
tea_request.protocol = 'https'
tea_request.headers['host'] = self.sts_endpoint
# request
response = TeaCore.do_action(tea_request)
if response.status_code == 200:
dic = json.loads(response.body.decode('utf-8'))
if "Credentials" in dic:
cre = dic.get("Credentials")
# 先转换为时间数组
time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ")
# 转换为时间戳
expiration = calendar.timegm(time_array)
return credentials.OIDCRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"),
cre.get("SecurityToken"), expiration, self)
raise CredentialException(response.body.decode('utf-8'))