def copy_tags()

in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]


    def copy_tags(self, source_project, source_dataset, source_table, target_project, target_dataset, target_table, include_policy_tags=False):
        
        success = True
        
        # lookup the source entry
        linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(source_project, source_dataset, source_table)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource = linked_resource
        source_entry = self.client.lookup_entry(request)
        
        if source_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
            success = False
            msg = 'Error {} is not a BQ table'.format(source_table)
            log_info(msg, None)
            print(json.dumps(msg))
            return success
        
        # lookup the target entry
        linked_resource = '//bigquery.googleapis.com/projects/{0}/datasets/{1}/tables/{2}'.format(target_project, target_dataset, target_table)
        
        request = datacatalog.LookupEntryRequest()
        request.linked_resource = linked_resource
        target_entry = self.client.lookup_entry(request)
        
        if target_entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
            success = False
            msg = 'Error {} is not a BQ table'.format(target_table)
            log_info(msg, None)
            print(json.dumps(error))
            return success
        
        # look to see if the source table is tagged
        tag_list = self.client.list_tags(parent=source_entry.name, timeout=120)
    
        for source_tag in tag_list:
            print('source_tag.template:', source_tag.template)
            print('source_tag.column:', source_tag.column)
            
            # get tag template fields
            self.template_id = source_tag.template.split('/')[5]
            self.template_project = source_tag.template.split('/')[1]
            self.template_region = source_tag.template.split('/')[3]
            self.template_path = source_tag.template
            template_fields = self.get_template()
            
            # start a new target tag
            target_tag = datacatalog.Tag()
            target_tag.template = source_tag.template
            
            if source_tag.column:
                target_tag.column = source_tag.column
            
            for template_field in template_fields:
    
                #print('template_field:', template_field)
                
                if template_field['field_id'] in source_tag.fields:
                    field_id = template_field['field_id']
                    tagged_field = source_tag.fields[field_id]
                    
                    print('field_id:', field_id)
                    
                    if tagged_field.bool_value:
                        field_type = 'bool'
                        field_value = tagged_field.bool_value
                    if tagged_field.double_value:
                        field_type = 'double'
                        field_value = tagged_field.double_value
                    if tagged_field.string_value:
                        field_type = 'string'
                        field_value = tagged_field.string_value
                    if tagged_field.enum_value:
                        field_type = 'enum'
                        field_value = tagged_field.enum_value.display_name
                    if tagged_field.timestamp_value:
                        field_type = 'timestamp'
                        field_value = tagged_field.timestamp_value
                    if tagged_field.richtext_value:
                        field_type = 'richtext'
                        field_value = tagged_field.richtext_value
                        
                    target_tag, error_exists = self.populate_tag_field(target_tag, field_id, field_type, [field_value], None)
            
            # create the target tag            
            tag_exists, tag_id = self.check_if_tag_exists(parent=target_entry.name, column=source_tag.column)
		
            if tag_exists == True:
                target_tag.name = tag_id
            
                try:
                    print('tag update request: ', target_tag)
                    response = self.client.update_tag(tag=target_tag)
                except Exception as e:
                    success = False
                    msg = 'Error occurred during tag update: {}'.format(target_tag)
                    log_error(msg, e)
            
            else:
                try:
                    print('tag create request: ', target_tag)
                    response = self.client.create_tag(parent=target_entry.name, tag=target_tag)
                except Exception as e:
                    success = False
                    msg = 'Error occurred during tag create: {}'.format(target_tag)
                    log_error(msg, e)
                        
        # copy policy tags            
        success = self.copy_policy_tags(source_project, source_dataset, source_table, target_project, target_dataset, target_table)    
        
        return success