func sendToReceivers()

in pkg/pubsub/pubsub.go [119:163]


func sendToReceivers(ctx context.Context, log logrus.FieldLogger, send Sender, receivers chan<- *Notification, result acker) error {
	return send(ctx, func(ctx context.Context, msg *pubsub.Message) {
		bucket, obj := msg.Attributes[keyBucket], msg.Attributes[keyObject]
		path, err := gcs.NewPath("gs://" + bucket + "/" + obj)
		if err != nil {
			log.WithError(err).WithFields(logrus.Fields{
				"bucket": bucket,
				"object": obj,
				"id":     msg.ID,
			}).Error("Failed to parse path")
			result.Ack(msg)
			return
		}
		when, err := time.Parse(time.RFC3339, msg.Attributes[keyTime])
		if err != nil {
			log.WithError(err).WithFields(logrus.Fields{
				"time": msg.Attributes[keyTime],
				"id":   msg.ID,
			}).Error("Failed to parse time")
			result.Nack(msg)
			return
		}
		gen, err := strconv.ParseInt(msg.Attributes[keyGeneration], 10, 64)
		if err != nil {
			log.WithError(err).WithFields(logrus.Fields{
				"generation": msg.Attributes[keyGeneration],
				"id":         msg.ID,
			}).Error("Failed to parse generation")
			result.Nack(msg)
			return
		}
		notice := Notification{
			Path:       *path,
			Event:      Event(msg.Attributes[keyEvent]),
			Time:       when,
			Generation: gen,
		}
		select {
		case <-ctx.Done():
			result.Nack(msg)
		case receivers <- &notice:
			result.Ack(msg)
		}
	})
}