func()

in beater/pubsubbeat.go [91:200]


func (bt *Pubsubbeat) Run(b *beat.Beat) error {
	bt.logger.Info("pubsubbeat is running! Hit CTRL-C to stop it.")

	var err error
	bt.client, err = b.Publisher.Connect()
	if err != nil {
		return err
	}

	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-bt.done
		// The beat is stopping...
		bt.logger.Info("cancelling PubSub receive context...")
		cancel()
		bt.logger.Info("closing PubSub client...")
		bt.pubsubClient.Close()
	}()

	err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
		// This callback is invoked concurrently by multiple goroutines
		var datetime time.Time

		if m.Attributes["pubsubbeat.compression"] == "gzip" {
			err = bt.decompress(m)
			if err != nil {
				bt.logger.Warnf("failed to decompress gzip: %s", err)
				m.Nack()
				return
			}
		}

		var rawRecords [][]byte
		if m.Attributes["pubsubbeat.batch_ndjson"] == "true" {
			rawRecords = bytes.Split(m.Data, []byte("\n"))
		} else {
			rawRecords = [][]byte{m.Data}
		}

		var batch []beat.Event

		for _, rawRecord := range rawRecords {
			if len(rawRecord) == 0 {
				continue
			}

			eventMap := common.MapStr{
				"type":         b.Info.Name,
				"message_id":   m.ID,
				"publish_time": m.PublishTime,
				"message":      string(rawRecord),
			}

			if len(m.Attributes) > 0 {
				eventMap["attributes"] = m.Attributes
			}

			if bt.config.Json.Enabled {
				var unmarshalErr error
				if bt.config.Json.FieldsUnderRoot {
					unmarshalErr = bt.decode(rawRecord, &eventMap)
					if unmarshalErr == nil && bt.config.Json.FieldsUseTimestamp {
						var timeErr error
						timestamp := eventMap[bt.config.Json.FieldsTimestampName]
						delete(eventMap, bt.config.Json.FieldsTimestampName)
						datetime, timeErr = time.Parse(bt.config.Json.FieldsTimestampFormat, timestamp.(string))
						if timeErr != nil {
							bt.logger.Errorf("Failed to format timestamp string as time. Using time.Now(): %s", timeErr)
						}
					}
				} else {
					var jsonData common.MapStr
					unmarshalErr = bt.decode(rawRecord, &jsonData)
					if unmarshalErr == nil {
						eventMap["json"] = jsonData
					}
				}

				if unmarshalErr != nil {
					bt.logger.Warnf("failed to decode json message: %s", unmarshalErr)
					if bt.config.Json.AddErrorKey {
						eventMap["error"] = common.MapStr{
							"key":     "json",
							"message": fmt.Sprintf("failed to decode json message: %s", unmarshalErr),
						}
					}
				}
			}

			if datetime.IsZero() {
				datetime = time.Now()
			}
			batch = append(batch, beat.Event{
				Timestamp: datetime,
				Fields:    eventMap,
			})
		}

		bt.client.PublishAll(batch)

		// TODO: Evaluate using AckHandler.
		m.Ack()
	})

	if err != nil {
		return fmt.Errorf("fail to receive message from subscription %q: %v", bt.subscription.String(), err)
	}

	return nil
}