func()

in firehose/firehose.go [183:221]


func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int {
	if output.timeKey != "" {
		buf := new(bytes.Buffer)
		err := output.fmtStrftime.Format(buf, *timeStamp)
		if err != nil {
			logrus.Errorf("[firehose %d] Could not create timestamp %v\n", output.PluginID, err)
			return fluentbit.FLB_ERROR
		}
		record[output.timeKey] = buf.String()
	}
	data, err := output.processRecord(record)
	if err != nil {
		logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
		// discard this single bad record instead and let the batch continue
		return fluentbit.FLB_OK
	}

	newDataSize := len(data)

	if len(output.records) == maximumRecordsPerPut || (output.dataLength+newDataSize) > maximumPutRecordBatchSize {
		retCode, err := output.sendCurrentBatch()
		if err != nil {
			logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
		}
		if retCode != fluentbit.FLB_OK {
			return retCode
		}
	}

    if output.simpleAggregation && len(output.records) > 0 && len(output.records[len(output.records)-1].Data) + newDataSize <= maximumRecordSize {
        output.records[len(output.records)-1].Data = append(output.records[len(output.records)-1].Data, data...)
    } else {
        output.records = append(output.records, &firehose.Record{
            Data: data,
        })
	}
	output.dataLength += newDataSize
	return fluentbit.FLB_OK
}