in beater/pubsubbeat.go [91:200]
func (bt *Pubsubbeat) Run(b *beat.Beat) error {
bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.")
var err error
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-bt.done
// The beat is stopping...
bt.logger.Info("cancelling PubSub receive context...")
cancel()
bt.logger.Info("closing PubSub client...")
bt.pubsubClient.Close()
}()
err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
// This callback is invoked concurrently by multiple goroutines
var datetime time.Time
if m.Attributes["pubsubbeat.compression"] == "gzip" {
err = bt.decompress(m)
if err != nil {
bt.logger.Warnf("failed to decompress gzip: %s", err)
m.Nack()
return
}
}
var rawRecords [][]byte
if m.Attributes["pubsubbeat.batch_ndjson"] == "true" {
rawRecords = bytes.Split(m.Data, []byte("\n"))
} else {
rawRecords = [][]byte{m.Data}
}
var batch []beat.Event
for _, rawRecord := range rawRecords {
if len(rawRecord) == 0 {
continue
}
eventMap := common.MapStr{
"type": b.Info.Name,
"message_id": m.ID,
"publish_time": m.PublishTime,
"message": string(rawRecord),
}
if len(m.Attributes) > 0 {
eventMap["attributes"] = m.Attributes
}
if bt.config.Json.Enabled {
var unmarshalErr error
if bt.config.Json.FieldsUnderRoot {
unmarshalErr = bt.decode(rawRecord, &eventMap)
if unmarshalErr == nil && bt.config.Json.FieldsUseTimestamp {
var timeErr error
timestamp := eventMap[bt.config.Json.FieldsTimestampName]
delete(eventMap, bt.config.Json.FieldsTimestampName)
datetime, timeErr = time.Parse(bt.config.Json.FieldsTimestampFormat, timestamp.(string))
if timeErr != nil {
bt.logger.Errorf("Failed to format timestamp string as time. Using time.Now(): %s", timeErr)
}
}
} else {
var jsonData common.MapStr
unmarshalErr = bt.decode(rawRecord, &jsonData)
if unmarshalErr == nil {
eventMap["json"] = jsonData
}
}
if unmarshalErr != nil {
bt.logger.Warnf("failed to decode json message: %s", unmarshalErr)
if bt.config.Json.AddErrorKey {
eventMap["error"] = common.MapStr{
"key": "json",
"message": fmt.Sprintf("failed to decode json message: %s", unmarshalErr),
}
}
}
}
if datetime.IsZero() {
datetime = time.Now()
}
batch = append(batch, beat.Event{
Timestamp: datetime,
Fields: eventMap,
})
}
bt.client.PublishAll(batch)
// TODO: Evaluate using AckHandler.
m.Ack()
})
if err != nil {
return fmt.Errorf("fail to receive message from subscription %q: %v", bt.subscription.String(), err)
}
return nil
}