in internal/subscriber/subscribe.go [81:119]
func processMessage(s *SubscriberService, msg *pubsub.Message) error {
// parse payload
var p payload
if err := json.Unmarshal(msg.Data, &p); err != nil {
return err
}
inMetadata, err := newMetadata(p)
if err != nil {
return err
}
_, isReplaced := msg.Attributes["overwrittenByGeneration"]
eventType := msg.Attributes["eventType"]
// Ignore replacement messages
if isReplaced {
return nil
}
switch eventType {
case storage.ObjectFinalizeEvent:
if err = s.handleFinalize(inMetadata); err != nil {
return err
}
case storage.ObjectDeleteEvent:
if err := s.handleDelete(inMetadata); err != nil {
return err
}
case storage.ObjectArchiveEvent:
if err := s.handleArchive(inMetadata); err != nil {
return err
}
default:
return fmt.Errorf("unknown event type: %s", eventType)
}
return nil
}