func eventHandler()

in app/metrics/processor/processor.go [67:93]


func eventHandler(metricsTopic pubsub.Topic, factory metrics.Factory) pubsub.MessageHandler {
	// factory: the metrics factory to generate metrics from the received event

	return func(ctx context.Context, message *pubsub.Message) {
		log.Printf("processing event ID: %v, data: %v", message.ID, message.Data)

		processingTime := ProcessingTime()
		time.Sleep(processingTime) // Simulate processing time

		ackTime := time.Now()
		metrics, err := factory(message.Data, message.PublishTime, ackTime, processingTime)
		if err != nil {
			log.Printf("nack the event ID: %v, error: %v", message.ID, err)
			message.Nack()
			return
		}
		log.Printf("event ID: %v converted to metrics: %v", message.ID, metrics)
		id, err := metricsTopic.Publish(ctx, metrics)
		if err != nil {
			log.Println(err)
		} else {
			log.Printf("event ID: %v is processed and published to metiric topic as message ID: %v", message.ID, id)
		}
		log.Printf("ack the event ID: %v", message.ID)
		message.Ack()
	}
}