func New()

in beater/pubsubbeat.go [53:89]


func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
	config, err := config.GetAndValidateConfig(cfg)
	if err != nil {
		return nil, err
	}

	logger := logp.NewLogger(fmt.Sprintf("PubSub: %s/%s/%s", config.Project, config.Topic, config.Subscription.Name))
	logger.Infof("config retrieved: %+v", config)

	client, err := createPubsubClient(config)
	if err != nil {
		return nil, err
	}

	subscription, err := getOrCreateSubscription(client, config)
	if err != nil {
		return nil, err
	}

	connectionPoolSize := config.Subscription.ConnectionPoolSize
	subscription.ReceiveSettings.NumGoroutines = connectionPoolSize

	if connectionPoolSize == 1 {
		logger.Warnf("Pub/Sub streaming pull has a per-subscriber throughput limit, https://cloud.google.com/pubsub/quotas")
		logger.Warnf("Use `subscription.connection_pool_size` to increase the numnber of subscribers.")
	}

	bt := &Pubsubbeat{
		done:         make(chan struct{}),
		config:       config,
		pubsubClient: client,
		subscription: subscription,
		logger:       logger,
		zippers:      &sync.Pool{New: func() interface{} { return new(gzip.Reader) }},
	}
	return bt, nil
}