in oss2/xml_utils.py [0:0]
def parse_get_bucket_replication_result(result, body):
root = ElementTree.fromstring(body)
for rule_node in root.findall("Rule"):
rule = ReplicationRule()
if rule_node.find("ID") is not None:
rule.rule_id = _find_tag(rule_node, "ID")
destination_node = rule_node.find("Destination")
rule.target_bucket_name = _find_tag_with_default(destination_node, "Bucket", None)
rule.target_bucket_location = _find_tag_with_default(destination_node, "Location", None)
rule.target_transfer_type = _find_tag_with_default(destination_node, "TransferType", None)
rule.target_cloud = _find_tag_with_default(destination_node, "Cloud", None)
rule.target_cloud_location = _find_tag_with_default(destination_node, "CloudLocation", None)
rule.status = _find_tag(rule_node, "Status")
rule.sync_role_name = _find_tag_with_default(rule_node, 'SyncRole', None)
rule.replica_kms_keyid = _find_tag_with_default(rule_node, 'EncryptionConfiguration/ReplicaKmsKeyID', None)
rule.sse_kms_encrypted_objects_status = _find_tag_with_default(rule_node, 'SourceSelectionCriteria/SseKmsEncryptedObjects/Status', None)
if _find_tag(rule_node, "HistoricalObjectReplication") == 'enabled':
rule.is_enable_historical_object_replication = True
else:
rule.is_enable_historical_object_replication = False
prefixes_node = rule_node.find('PrefixSet')
if prefixes_node is not None:
rule.prefix_list = _find_all_tags(prefixes_node, 'Prefix')
actions = _find_tag(rule_node, 'Action')
rule.action_list = actions.split(',')
result.rule_list.append(rule)