in beater/pubsubbeat.go [53:89]
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config, err := config.GetAndValidateConfig(cfg)
if err != nil {
return nil, err
}
logger := logp.NewLogger(fmt.Sprintf("PubSub: %s/%s/%s", config.Project, config.Topic, config.Subscription.Name))
logger.Infof("config retrieved: %+v", config)
client, err := createPubsubClient(config)
if err != nil {
return nil, err
}
subscription, err := getOrCreateSubscription(client, config)
if err != nil {
return nil, err
}
connectionPoolSize := config.Subscription.ConnectionPoolSize
subscription.ReceiveSettings.NumGoroutines = connectionPoolSize
if connectionPoolSize == 1 {
logger.Warnf("Pub/Sub streaming pull has a per-subscriber throughput limit, https://cloud.google.com/pubsub/quotas")
logger.Warnf("Use `subscription.connection_pool_size` to increase the numnber of subscribers.")
}
bt := &Pubsubbeat{
done: make(chan struct{}),
config: config,
pubsubClient: client,
subscription: subscription,
logger: logger,
zippers: &sync.Pool{New: func() interface{} { return new(gzip.Reader) }},
}
return bt, nil
}