func()

in kinesis/kinesis.go [493:515]


func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) {
	if len(*records) == 0 {
		return fluentbit.FLB_OK, nil
	}
	outputPlugin.timer.Check()
	response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{
		Records:    *records,
		StreamName: aws.String(outputPlugin.stream),
	})
	if err != nil {
		logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err)
		outputPlugin.timer.Start()
		if aerr, ok := err.(awserr.Error); ok {
			if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException {
				logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
			}
		}
		return fluentbit.FLB_RETRY, err
	}
	logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records))

	return outputPlugin.processAPIResponse(records, dataLength, response)
}