in internal/subscriber/subscribe.go [134:171]
func (s *SubscriberService) handleFinalize(inMetadata *model.Metadata) error {
existingMetadata, err := s.metadataRepo.Get(inMetadata.Bucket, inMetadata.Name)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("error getting existing metadata: %w", err)
}
// Check if incoming metadata is necessary to handle
if existingMetadata != nil {
if existingMetadata.Updated.After(inMetadata.Updated) {
return nil
}
if existingMetadata.StorageClass != inMetadata.StorageClass {
return s.handleArchive(inMetadata)
}
}
// Insert if metadata does not exist
if existingMetadata == nil {
if err := s.metadataRepo.Insert(inMetadata); err != nil {
return fmt.Errorf("error inserting metadata: %w", err)
}
if err := s.directoryRepo.UpsertParentDirs(repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket, inMetadata.Name, inMetadata.Size, 1); err != nil {
return fmt.Errorf("error upserting parent directories: %w", err)
}
} else {
// Otherwise, update metadata
if err := s.metadataRepo.Update(inMetadata.Bucket, inMetadata.Name, inMetadata.StorageClass, inMetadata.Size, inMetadata.Updated); err != nil {
return fmt.Errorf("error updating metadata: %w", err)
}
sizeDiff := inMetadata.Size - existingMetadata.Size
if err := s.directoryRepo.UpsertParentDirs(repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket, inMetadata.Name, sizeDiff, 0); err != nil {
return fmt.Errorf("error upserting parent directories: %w", err)
}
}
return nil
}