in app/metrics/processor/processor.go [29:62]
func Start(ctx context.Context, factory metrics.Factory) error {
client, err := pubsub.Service.NewClient(ctx, nil)
if err != nil {
return err
}
defer client.Close() // nolint: errcheck
// The subscription to receive event
sub := client.NewSubscription(config.Config.EventSubscription, config.Config.EventCodec, config.Config.SubscriberNumGoroutines, config.Config.SubscriberMaxOutstanding)
// The topic to publish the metrics converted from received event
metricsTopic := client.NewTopic(config.Config.MetricsTopic, config.Config.MetricsCodec, config.Config.PublisherBatchSize, config.Config.PublisherNumGoroutines, 0)
defer metricsTopic.Stop()
// The handler to handles the received event, generate and publish metrics to the metrics topic
handler := eventHandler(metricsTopic, factory)
// Start to handle received event using given handler.
// It does not return until the context is done
for {
if err := sub.Receive(ctx, handler); err != nil {
log.Printf("sub.Receive: %v", err)
}
select {
case <-ctx.Done():
log.Printf("context done, subscriber stopped")
return nil
default:
waitTime := 30 * time.Second
log.Printf("waiting %v for retry", waitTime)
time.Sleep(waitTime)
}
}
}