func()

in firehose/firehose.go [299:324]


func (output *OutputPlugin) sendCurrentBatch() (int, error) {
	// return if the batch is empty
	if len(output.records) == 0 {
		return fluentbit.FLB_OK, nil
	}

	output.timer.Check()

	response, err := output.client.PutRecordBatch(&firehose.PutRecordBatchInput{
		DeliveryStreamName: aws.String(output.deliveryStream),
		Records:            output.records,
	})
	if err != nil {
		logrus.Errorf("[firehose %d] PutRecordBatch failed with %v", output.PluginID, err)
		output.timer.Start()
		if aerr, ok := err.(awserr.Error); ok {
			if aerr.Code() == firehose.ErrCodeServiceUnavailableException {
				logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
			}
		}
		return fluentbit.FLB_RETRY, err
	}
	logrus.Debugf("[firehose %d] Sent %d events to Firehose\n", output.PluginID, len(output.records))

	return output.processAPIResponse(response)
}