func Start()

in app/metrics/processor/processor.go [29:62]


func Start(ctx context.Context, factory metrics.Factory) error {
	client, err := pubsub.Service.NewClient(ctx, nil)
	if err != nil {
		return err
	}
	defer client.Close() // nolint: errcheck

	// The subscription to receive event
	sub := client.NewSubscription(config.Config.EventSubscription, config.Config.EventCodec, config.Config.SubscriberNumGoroutines, config.Config.SubscriberMaxOutstanding)

	// The topic to publish the metrics converted from received event
	metricsTopic := client.NewTopic(config.Config.MetricsTopic, config.Config.MetricsCodec, config.Config.PublisherBatchSize, config.Config.PublisherNumGoroutines, 0)
	defer metricsTopic.Stop()

	// The handler to handles the received event, generate and publish metrics to the metrics topic
	handler := eventHandler(metricsTopic, factory)

	// Start to handle received event using given handler.
	// It does not return until the context is done
	for {
		if err := sub.Receive(ctx, handler); err != nil {
			log.Printf("sub.Receive: %v", err)
		}
		select {
		case <-ctx.Done():
			log.Printf("context done, subscriber stopped")
			return nil
		default:
			waitTime := 30 * time.Second
			log.Printf("waiting %v for retry", waitTime)
			time.Sleep(waitTime)
		}
	}
}