beater/pubsubbeat.go (220 lines of code) (raw):
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beater
import (
"bytes"
"fmt"
"io"
"sync"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
"github.com/elastic/beats/v7/libbeat/logp"
"context"
"runtime"
"compress/gzip"
"encoding/json"
"cloud.google.com/go/pubsub"
"gitlab.com/gitlab-org/pubsubbeat/config"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Pubsubbeat struct {
done chan struct{}
config *config.Config
client beat.Client
pubsubClient *pubsub.Client
subscription *pubsub.Subscription
logger *logp.Logger
zippers *sync.Pool
}
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config, err := config.GetAndValidateConfig(cfg)
if err != nil {
return nil, err
}
logger := logp.NewLogger(fmt.Sprintf("PubSub: %s/%s/%s", config.Project, config.Topic, config.Subscription.Name))
logger.Infof("config retrieved: %+v", config)
client, err := createPubsubClient(config)
if err != nil {
return nil, err
}
subscription, err := getOrCreateSubscription(client, config)
if err != nil {
return nil, err
}
connectionPoolSize := config.Subscription.ConnectionPoolSize
subscription.ReceiveSettings.NumGoroutines = connectionPoolSize
if connectionPoolSize == 1 {
logger.Warnf("Pub/Sub streaming pull has a per-subscriber throughput limit, https://cloud.google.com/pubsub/quotas")
logger.Warnf("Use `subscription.connection_pool_size` to increase the numnber of subscribers.")
}
bt := &Pubsubbeat{
done: make(chan struct{}),
config: config,
pubsubClient: client,
subscription: subscription,
logger: logger,
zippers: &sync.Pool{New: func() interface{} { return new(gzip.Reader) }},
}
return bt, nil
}
func (bt *Pubsubbeat) Run(b *beat.Beat) error {
bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.")
var err error
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-bt.done
// The beat is stopping...
bt.logger.Info("cancelling PubSub receive context...")
cancel()
bt.logger.Info("closing PubSub client...")
bt.pubsubClient.Close()
}()
err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
// This callback is invoked concurrently by multiple goroutines
var datetime time.Time
if m.Attributes["pubsubbeat.compression"] == "gzip" {
err = bt.decompress(m)
if err != nil {
bt.logger.Warnf("failed to decompress gzip: %s", err)
m.Nack()
return
}
}
var rawRecords [][]byte
if m.Attributes["pubsubbeat.batch_ndjson"] == "true" {
rawRecords = bytes.Split(m.Data, []byte("\n"))
} else {
rawRecords = [][]byte{m.Data}
}
var batch []beat.Event
for _, rawRecord := range rawRecords {
if len(rawRecord) == 0 {
continue
}
eventMap := common.MapStr{
"type": b.Info.Name,
"message_id": m.ID,
"publish_time": m.PublishTime,
"message": string(rawRecord),
}
if len(m.Attributes) > 0 {
eventMap["attributes"] = m.Attributes
}
if bt.config.Json.Enabled {
var unmarshalErr error
if bt.config.Json.FieldsUnderRoot {
unmarshalErr = bt.decode(rawRecord, &eventMap)
if unmarshalErr == nil && bt.config.Json.FieldsUseTimestamp {
var timeErr error
timestamp := eventMap[bt.config.Json.FieldsTimestampName]
delete(eventMap, bt.config.Json.FieldsTimestampName)
datetime, timeErr = time.Parse(bt.config.Json.FieldsTimestampFormat, timestamp.(string))
if timeErr != nil {
bt.logger.Errorf("Failed to format timestamp string as time. Using time.Now(): %s", timeErr)
}
}
} else {
var jsonData common.MapStr
unmarshalErr = bt.decode(rawRecord, &jsonData)
if unmarshalErr == nil {
eventMap["json"] = jsonData
}
}
if unmarshalErr != nil {
bt.logger.Warnf("failed to decode json message: %s", unmarshalErr)
if bt.config.Json.AddErrorKey {
eventMap["error"] = common.MapStr{
"key": "json",
"message": fmt.Sprintf("failed to decode json message: %s", unmarshalErr),
}
}
}
}
if datetime.IsZero() {
datetime = time.Now()
}
batch = append(batch, beat.Event{
Timestamp: datetime,
Fields: eventMap,
})
}
bt.client.PublishAll(batch)
// TODO: Evaluate using AckHandler.
m.Ack()
})
if err != nil {
return fmt.Errorf("fail to receive message from subscription %q: %v", bt.subscription.String(), err)
}
return nil
}
func (bt *Pubsubbeat) Stop() {
bt.client.Close()
close(bt.done)
}
func (bt *Pubsubbeat) decompress(m *pubsub.Message) error {
rc := bt.zippers.Get().(*gzip.Reader)
if err := rc.Reset(bytes.NewReader(m.Data)); err != nil {
return fmt.Errorf("rc.Reset: %v", err)
}
var data bytes.Buffer
if _, err := io.Copy(&data, rc); err != nil {
return fmt.Errorf("io.Copy: %v", err)
}
if err := rc.Close(); err != nil {
return fmt.Errorf("gzip.Close: %v", err)
}
bt.zippers.Put(rc)
m.Data = data.Bytes()
return nil
}
func (bt *Pubsubbeat) decode(s []byte, i *common.MapStr) error {
if bt.config.Json.UseNumber {
decoder := json.NewDecoder(bytes.NewReader(s))
decoder.UseNumber()
err := decoder.Decode(i)
if err == nil {
jsontransform.TransformNumbers(*i)
}
return err
} else {
return json.Unmarshal(s, i)
}
}
func createPubsubClient(config *config.Config) (*pubsub.Client, error) {
ctx := context.Background()
userAgent := fmt.Sprintf(
"Elastic/Pubsubbeat (%s; %s)", runtime.GOOS, runtime.GOARCH)
options := []option.ClientOption{option.WithUserAgent(userAgent)}
if config.CredentialsFile != "" {
options = append(options, option.WithCredentialsFile(config.CredentialsFile))
}
client, err := pubsub.NewClient(ctx, config.Project, options...)
if err != nil {
return nil, fmt.Errorf("fail to create pubsub client: %v", err)
}
return client, nil
}
func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pubsub.Subscription, error) {
if !config.Subscription.Create {
subscription := client.Subscription(config.Subscription.Name)
return subscription, nil
}
topic := client.Topic(config.Topic)
ctx := context.Background()
subscription, err := client.CreateSubscription(ctx, config.Subscription.Name, pubsub.SubscriptionConfig{
Topic: topic,
RetainAckedMessages: config.Subscription.RetainAckedMessages,
RetentionDuration: config.Subscription.RetentionDuration,
})
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
// The subscription already exists.
subscription = client.Subscription(config.Subscription.Name)
} else if ok && st.Code() == codes.NotFound {
return nil, fmt.Errorf("topic %q does not exists", config.Topic)
} else if err != nil {
return nil, fmt.Errorf("fail to create subscription: %v", err)
}
return subscription, nil
}