solution/ack/solutions/IAM/OIDC/python/oidc_v1.py (71 lines of code) (raw):

#!/usr/bin/env python #coding=utf-8 from aliyunsdksts.request.v20150401.AssumeRoleWithOIDCRequest import AssumeRoleWithOIDCRequest import json import logging import sys import time from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest from aliyunsdkcore.auth.credentials import StsTokenCredential,AccessKeyCredential class RAM(object): def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id): self.sts_access_key = sts_access_key self.sts_access_secret = sts_access_secret self.sts_token = sts_token self.region_id = region_id # 使用sts self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) def ListUsers(self): '''查询全部RAM用户列表 ''' request = CommonRequest() request.set_accept_format('json') request.set_domain('ram.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2015-05-01') request.set_action_name('ListUsers') response = self.clt.do_action(request) logging.debug(str(response, encoding='utf-8')) return json.loads(response) class OIDC(object): def __init__(self, access_key, access_secret, region_id): self.access_key = access_key self.access_secret = access_secret self.region_id = region_id # 使用oidc self.credentials = AccessKeyCredential(self.access_key, self.access_secret) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) def get_sts_credentials(self,oidc_provider_arn,role_arn,oidc_token,role_session_name): request = AssumeRoleWithOIDCRequest() request.set_accept_format('json') request.set_OIDCProviderArn(oidc_provider_arn) request.set_RoleArn(role_arn) request.set_OIDCToken(oidc_token) request.set_RoleSessionName(role_session_name) response = self.clt.do_action_with_exception(request) return json.loads(response) def read_oidc_token(file_path): with open(file_path,'r') as f: ff=f.read() return ff if __name__ == '__main__': # 先通过OIDC拿到STS STS_ASSUME_ROLE_AK = "xx" STS_ASSUME_ROLE_SK = "xxx" REGION_ID = "cn-shanghai" OIDCProviderArn = "acs:ram::1146716667364xxx:oidc-provider/ack-rrsa-c1f8defef6e4b41dc81a6a20235eb8631" RoleArn = "acs:ram::1146716667364xxx:role/ack-app-sts-role" RoleSessionName = "yaofangapp" OIDCToken = read_oidc_token("/var/run/secrets/tokens/oidc-token") oidc = OIDC(STS_ASSUME_ROLE_AK,STS_ASSUME_ROLE_SK,REGION_ID) try: sts_tuple = oidc.get_sts_credentials(OIDCProviderArn,RoleArn,OIDCToken,RoleSessionName) except Exception as ex: logging.error("OIDC Token失效,导致查询STS失效,程序上面做次重试") time.sleep(1000) sts_tuple = oidc.get_sts_credentials(OIDCProviderArn, RoleArn, OIDCToken, RoleSessionName) try: sts_ak,sts_sk,sts_token = sts_tuple["Credentials"]["AccessKeyId"],sts_tuple["Credentials"]["AccessKeySecret"],sts_tuple["Credentials"]["SecurityToken"] ram = RAM(sts_ak,sts_sk,sts_token,REGION_ID) r = ram.ListUsers() print(r) except Exception as ex: sys.exit()