in pkg/pubsub/pubsub.go [119:163]
func sendToReceivers(ctx context.Context, log logrus.FieldLogger, send Sender, receivers chan<- *Notification, result acker) error {
return send(ctx, func(ctx context.Context, msg *pubsub.Message) {
bucket, obj := msg.Attributes[keyBucket], msg.Attributes[keyObject]
path, err := gcs.NewPath("gs://" + bucket + "/" + obj)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"bucket": bucket,
"object": obj,
"id": msg.ID,
}).Error("Failed to parse path")
result.Ack(msg)
return
}
when, err := time.Parse(time.RFC3339, msg.Attributes[keyTime])
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"time": msg.Attributes[keyTime],
"id": msg.ID,
}).Error("Failed to parse time")
result.Nack(msg)
return
}
gen, err := strconv.ParseInt(msg.Attributes[keyGeneration], 10, 64)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"generation": msg.Attributes[keyGeneration],
"id": msg.ID,
}).Error("Failed to parse generation")
result.Nack(msg)
return
}
notice := Notification{
Path: *path,
Event: Event(msg.Attributes[keyEvent]),
Time: when,
Generation: gen,
}
select {
case <-ctx.Done():
result.Nack(msg)
case receivers <- ¬ice:
result.Ack(msg)
}
})
}