in app/metrics/processor/processor.go [67:93]
func eventHandler(metricsTopic pubsub.Topic, factory metrics.Factory) pubsub.MessageHandler {
// factory: the metrics factory to generate metrics from the received event
return func(ctx context.Context, message *pubsub.Message) {
log.Printf("processing event ID: %v, data: %v", message.ID, message.Data)
processingTime := ProcessingTime()
time.Sleep(processingTime) // Simulate processing time
ackTime := time.Now()
metrics, err := factory(message.Data, message.PublishTime, ackTime, processingTime)
if err != nil {
log.Printf("nack the event ID: %v, error: %v", message.ID, err)
message.Nack()
return
}
log.Printf("event ID: %v converted to metrics: %v", message.ID, metrics)
id, err := metricsTopic.Publish(ctx, metrics)
if err != nil {
log.Println(err)
} else {
log.Printf("event ID: %v is processed and published to metiric topic as message ID: %v", message.ID, id)
}
log.Printf("ack the event ID: %v", message.ID)
message.Ack()
}
}