internal/subscriber/subscribe.go (174 lines of code) (raw):
package subscriber
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"strconv"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/gcs-metadata-server/internal/model"
"github.com/GoogleCloudPlatform/gcs-metadata-server/internal/repo"
)
type payload struct {
Bucket string
Name string
Size string
StorageClass string
Updated time.Time
Created time.Time
}
type Susbcriber interface {
Start(ctx context.Context) error
consumeMessage(ctx context.Context, msg *pubsub.Message)
handleFinalize(inMetadata *model.Metadata) error
handleArchive(inMetadata *model.Metadata) error
handleDelete(inMetadata *model.Metadata) error
}
type SubscriberService struct {
client *pubsub.Client
subscriptionId string
directoryRepo repo.DirectoryRepository
metadataRepo repo.MetadataRepository
}
func NewSubscriberService(client *pubsub.Client, subscriptionId string, directoryRepo repo.DirectoryRepository, metadataRepo repo.MetadataRepository) *SubscriberService {
return &SubscriberService{
client,
subscriptionId,
directoryRepo,
metadataRepo,
}
}
func newMetadata(p payload) (*model.Metadata, error) {
size, err := strconv.ParseInt(p.Size, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing size: %v", err.Error())
}
return &model.Metadata{
Bucket: p.Bucket,
Name: p.Name,
Size: size,
StorageClass: p.StorageClass,
Updated: p.Updated,
Created: p.Created,
}, nil
}
// Start initiates subscription process by listening to all messages at subscriptionId
func (s *SubscriberService) Start(ctx context.Context) error {
sub := s.client.SubscriptionInProject(s.subscriptionId, s.client.Project())
if err := sub.Receive(ctx, s.consumeMessage); err != nil {
return fmt.Errorf("error receiving messages %w", err)
}
return nil
}
// processMessage handles incoming metadata and performs database operations based on the eventType of incoming *pubsub.Message
//
// Messages are expected to be unordered. The handling of incoming metadata has to
// be based on its update time and gracefully Nack()'d when necessary
func processMessage(s *SubscriberService, msg *pubsub.Message) error {
// parse payload
var p payload
if err := json.Unmarshal(msg.Data, &p); err != nil {
return err
}
inMetadata, err := newMetadata(p)
if err != nil {
return err
}
_, isReplaced := msg.Attributes["overwrittenByGeneration"]
eventType := msg.Attributes["eventType"]
// Ignore replacement messages
if isReplaced {
return nil
}
switch eventType {
case storage.ObjectFinalizeEvent:
if err = s.handleFinalize(inMetadata); err != nil {
return err
}
case storage.ObjectDeleteEvent:
if err := s.handleDelete(inMetadata); err != nil {
return err
}
case storage.ObjectArchiveEvent:
if err := s.handleArchive(inMetadata); err != nil {
return err
}
default:
return fmt.Errorf("unknown event type: %s", eventType)
}
return nil
}
// consumeMessage is a callback function for pubsub.Receive() which handles
// the acknowledgment of messages based on processMessage() results
func (s *SubscriberService) consumeMessage(ctx context.Context, msg *pubsub.Message) {
if err := processMessage(s, msg); err != nil {
log.Printf("message not acknowledged: %v\n", err)
msg.Nack()
return
}
msg.Ack()
}
// handleFinalize takes incoming metadata and determines to insert or update
// based on if metadata already exists and is newer
func (s *SubscriberService) handleFinalize(inMetadata *model.Metadata) error {
existingMetadata, err := s.metadataRepo.Get(inMetadata.Bucket, inMetadata.Name)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("error getting existing metadata: %w", err)
}
// Check if incoming metadata is necessary to handle
if existingMetadata != nil {
if existingMetadata.Updated.After(inMetadata.Updated) {
return nil
}
if existingMetadata.StorageClass != inMetadata.StorageClass {
return s.handleArchive(inMetadata)
}
}
// Insert if metadata does not exist
if existingMetadata == nil {
if err := s.metadataRepo.Insert(inMetadata); err != nil {
return fmt.Errorf("error inserting metadata: %w", err)
}
if err := s.directoryRepo.UpsertParentDirs(repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket, inMetadata.Name, inMetadata.Size, 1); err != nil {
return fmt.Errorf("error upserting parent directories: %w", err)
}
} else {
// Otherwise, update metadata
if err := s.metadataRepo.Update(inMetadata.Bucket, inMetadata.Name, inMetadata.StorageClass, inMetadata.Size, inMetadata.Updated); err != nil {
return fmt.Errorf("error updating metadata: %w", err)
}
sizeDiff := inMetadata.Size - existingMetadata.Size
if err := s.directoryRepo.UpsertParentDirs(repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket, inMetadata.Name, sizeDiff, 0); err != nil {
return fmt.Errorf("error upserting parent directories: %w", err)
}
}
return nil
}
// handleArchive takes incoming metadata and updates parent directories to new storage class
func (s *SubscriberService) handleArchive(inMetadata *model.Metadata) error {
existingMetadata, err := s.metadataRepo.Get(inMetadata.Bucket, inMetadata.Name)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("error getting existing metadata: %w", err)
}
// Check if incoming metadata is necessary to handle
if existingMetadata != nil {
if existingMetadata.Updated.After(inMetadata.Updated) {
return nil // skip, already in most recent update
}
if existingMetadata.StorageClass == inMetadata.StorageClass {
return nil // skip, already in correct StorageClass
}
} else {
// if metadata does not exist, it is a normal insert
return s.handleFinalize(inMetadata)
}
if err := s.metadataRepo.Update(inMetadata.Bucket, inMetadata.Name, inMetadata.StorageClass,
inMetadata.Size, inMetadata.Updated); err != nil {
return fmt.Errorf("error updating metadata: %w", err)
}
if err := s.directoryRepo.UpsertArchiveParentDirs(repo.StorageClass(existingMetadata.StorageClass),
repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket, inMetadata.Name, inMetadata.Size); err != nil {
return fmt.Errorf("error upserting parent directories: %w", err)
}
return nil
}
// handleDelete tries to delete incoming metadata inMetadata.
// Returns error if metadata does not exist
func (s *SubscriberService) handleDelete(inMetadata *model.Metadata) error {
// Check if metadata exists
existingMetadata, err := s.metadataRepo.Get(inMetadata.Bucket, inMetadata.Name)
if err != nil {
return err
}
// Skip if existing metadata is newer
if existingMetadata.Updated.After(inMetadata.Updated) {
return nil
}
if err := s.metadataRepo.Delete(inMetadata.Bucket, inMetadata.Name); err != nil {
return err
}
return s.directoryRepo.UpsertParentDirs(repo.StorageClass(inMetadata.StorageClass), inMetadata.Bucket,
inMetadata.Name, -inMetadata.Size, -1)
}