sample/encryption_kms.py (74 lines of code) (raw):

import argparse import base64 import json from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest from alibabacloud_dkms_transfer.kms_transfer_acs_client import KmsTransferAcsClient from typing import Optional, Dict import alibabacloud_oss_v2 as oss parser = argparse.ArgumentParser(description="encryption kms sample") parser.add_argument('--region', help='The region in which the bucket is located.', required=True) parser.add_argument('--bucket', help='The name of the bucket.', required=True) parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS') parser.add_argument('--key', help='The name of the object.', required=True) parser.add_argument('--kms_id', help='The id of the your CMK ID.', required=True) class MasterKmsCipher(oss.crypto.MasterCipher): def __init__( self, mat_desc: Optional[Dict] = None, kms_client: Optional[KmsTransferAcsClient] = None, kms_id: Optional[str] = None, ): self.kms_client = kms_client self.kms_id = kms_id self._mat_desc = None if mat_desc is not None and len(mat_desc.items()) > 0: self._mat_desc = json.dumps(mat_desc) def get_wrap_algorithm(self) -> str: return 'KMS/ALICLOUD' def get_mat_desc(self) -> str: return self._mat_desc or '' def encrypt(self, data: bytes) -> bytes: base64_crypto = base64.b64encode(data) request = EncryptRequest() request.set_KeyId(self.kms_id) request.set_Plaintext(base64_crypto) response = self.kms_client.do_action_with_exception(request) return base64.b64decode(json.loads(response).get('CiphertextBlob')) def decrypt(self, data: bytes) -> bytes: base64_crypto = base64.b64encode(data) request = DecryptRequest() request.set_CiphertextBlob(base64_crypto) response = self.kms_client.do_action_with_exception(request) return base64.b64decode(json.loads(response).get('Plaintext')) def main(): args = parser.parse_args() # Loading credentials values from the environment variables credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() # Using the SDK's default configuration cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = args.region if args.endpoint is not None: cfg.endpoint = args.endpoint client = oss.Client(cfg) kms_client = KmsTransferAcsClient(ak=credentials_provider._credentials.access_key_id, secret=credentials_provider._credentials.access_key_secret, region_id=args.region) mc = MasterKmsCipher( mat_desc={"desc": "your master encrypt key material describe information"}, kms_client=kms_client, kms_id=args.kms_id, ) encryption_client = oss.EncryptionClient(client, mc) data = b'hello world' result = encryption_client.put_object(oss.PutObjectRequest( bucket=args.bucket, key=args.key, body=data, )) print(vars(result)) result = encryption_client.get_object(oss.GetObjectRequest( bucket=args.bucket, key=args.key, )) print(vars(result)) print(result.body.read()) if __name__ == "__main__": main()