in sample/invoke_operation_bucket_tags_sample2.py [0:0]
def main():
args = parser.parse_args()
# Loading credentials values from the environment variables
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# Using the SDK's default configuration
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss.Client(cfg)
# put bucket tags
request = PutBucketTagsRequest(
bucket=args.bucket,
tagging=Tagging(
tag_set=TagSet(
tags=[Tag(
key='test_key',
value='test_value',
), Tag(
key='test_key2',
value='test_value2',
)],
),
),
)
op_input = serde.serialize_input(
request=request,
op_input=OperationInput(
op_name='PutBucketTags',
method='PUT',
headers=CaseInsensitiveDict({
'Content-Type': 'application/xml',
}),
parameters={
'tagging': '',
},
bucket=args.bucket,
op_metadata={'sub-resource': ['tagging']},
),
custom_serializer=[
serde_utils.add_content_md5
]
)
op_output = client.invoke_operation(op_input)
result = serde.deserialize_output(
result=PutBucketTagsResult(),
op_output=op_output,
custom_deserializer=[
serde.deserialize_output_xmlbody
],
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
)
# get bucket tags
request = GetBucketTagsRequest(
bucket=args.bucket,
)
op_input = serde.serialize_input(
request=request,
op_input=OperationInput(
op_name='GetBucketTags',
method='GET',
headers=CaseInsensitiveDict({
'Content-Type': 'application/xml',
}),
parameters={
'tagging': '',
},
bucket=args.bucket,
op_metadata={'sub-resource': ['tagging']},
),
custom_serializer=[
serde_utils.add_content_md5
]
)
op_output = client.invoke_operation(op_input)
result = serde.deserialize_output(
result=GetBucketTagsResult(),
op_output=op_output,
custom_deserializer=[
serde.deserialize_output_xmlbody
],
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' key: {result.tagging.tag_set.tags[0].key},'
f' value: {result.tagging.tag_set.tags[0].value},'
f' key: {result.tagging.tag_set.tags[1].key},'
f' value: {result.tagging.tag_set.tags[1].value},'
)
# delete bucket tags
request = DeleteBucketTagsRequest(
bucket=args.bucket,
)
op_input = serde.serialize_input(
request=request,
op_input=OperationInput(
op_name='DeleteBucketTags',
method='DELETE',
headers=CaseInsensitiveDict({
'Content-Type': 'application/xml',
}),
parameters={
'tagging': '',
},
bucket=args.bucket,
op_metadata={'sub-resource': ['tagging']},
),
custom_serializer=[
serde_utils.add_content_md5
]
)
op_output = client.invoke_operation(op_input)
result = serde.deserialize_output(
result=DeleteBucketTagsResult(),
op_output=op_output,
custom_deserializer=[
serde.deserialize_output_xmlbody
],
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
)