func processMessage()

in internal/subscriber/subscribe.go [81:119]


func processMessage(s *SubscriberService, msg *pubsub.Message) error {
	// parse payload
	var p payload
	if err := json.Unmarshal(msg.Data, &p); err != nil {
		return err
	}

	inMetadata, err := newMetadata(p)
	if err != nil {
		return err
	}

	_, isReplaced := msg.Attributes["overwrittenByGeneration"]
	eventType := msg.Attributes["eventType"]

	// Ignore replacement messages
	if isReplaced {
		return nil
	}

	switch eventType {
	case storage.ObjectFinalizeEvent:
		if err = s.handleFinalize(inMetadata); err != nil {
			return err
		}
	case storage.ObjectDeleteEvent:
		if err := s.handleDelete(inMetadata); err != nil {
			return err
		}
	case storage.ObjectArchiveEvent:
		if err := s.handleArchive(inMetadata); err != nil {
			return err
		}
	default:
		return fmt.Errorf("unknown event type: %s", eventType)
	}

	return nil
}