func()

in kinesis/kinesis.go [306:345]


func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int {
	// Use a different buffer to batch the logs
	requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut)
	dataLength := 0

	for i, record := range *records {
		newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey))

		if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize {
			retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength)
			if err != nil {
				logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
			}
			if retCode != fluentbit.FLB_OK {
				unsent := (*records)[i:]
				// requestBuf will contain records sendCurrentBatch failed to send,
				// combine those with the records yet to be sent/batched
				*records = append(requestBuf, unsent...)
				return retCode
			}
		}

		requestBuf = append(requestBuf, record)
		dataLength += newRecordSize
	}

	// send any remaining records
	retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength)
	if err != nil {
		logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
	}

	if retCode == output.FLB_OK {
		logrus.Debugf("[kinesis %d] Flushed %d logs\n", outputPlugin.PluginID, len(*records))
	}

	// requestBuf will contain records sendCurrentBatch failed to send
	*records = requestBuf
	return retCode
}