in app/pubsub-integration/pubsub/pubsub.go [202:219]
func (sub *Subscription) Receive(ctx context.Context, handler MessageHandler) error {
// handler: the callback function to handle the received message
return sub.subscription.Receive(ctx, func(ctx context.Context, pubsubMessage *pubsub.Message) {
log.Printf("got Cloud Pub/Sub message ID: %v", pubsubMessage.ID)
data, err := avro.DecodeFromJSON(sub.codec, pubsubMessage.Data)
if err != nil {
log.Printf("failed to check schema, message: %v, ", pubsubMessage.ID)
pubsubMessage.Nack()
return
}
message := &Message{
Message: pubsubMessage,
Data: data,
}
handler(ctx, message)
})
}