func()

in kinesis/kinesis.go [237:285]


func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int {
	if outputPlugin.timeKey != "" {
		buf := new(bytes.Buffer)
		err := outputPlugin.fmtStrftime.Format(buf, *timeStamp)
		if err != nil {
			logrus.Errorf("[kinesis %d] Could not create timestamp %v\n", outputPlugin.PluginID, err)
			return fluentbit.FLB_ERROR
		}
		record[outputPlugin.timeKey] = buf.String()
	}

	partitionKey, hasPartitionKey := outputPlugin.getPartitionKey(record)
	var partitionKeyLen = len(partitionKey)
	if !hasPartitionKey {
		partitionKeyLen = outputPlugin.stringGen.Size
	}
	data, err := outputPlugin.processRecord(record, partitionKeyLen)
	if err != nil {
		logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
		// discard this single bad record instead and let the batch continue
		return fluentbit.FLB_OK
	}

	if !outputPlugin.isAggregate {
		if !hasPartitionKey {
			partitionKey = outputPlugin.stringGen.RandomString()
		}
		logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
		*records = append(*records, &kinesis.PutRecordsRequestEntry{
			Data:         data,
			PartitionKey: aws.String(partitionKey),
		})
	} else {
		// Use the KPL aggregator to buffer records isAggregate is true
		aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, hasPartitionKey, data)
		if err != nil {
			logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err)
			// discard this single bad record instead and let the batch continue
			return fluentbit.FLB_OK
		}

		// If aggRecord isn't nil, then a full kinesis record has been aggregated
		if aggRecord != nil {
			*records = append(*records, aggRecord)
		}
	}

	return fluentbit.FLB_OK
}