in out_writeapi.go [681:759]
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
id := getFLBPluginContext(ctx)
// Locate stream in map
// Look up through reference
config, ok := configMap[id]
if !ok {
log.Printf("Finding configuration for output instance with id: %d failed in FLBPluginFlushCtx", id)
return output.FLB_ERROR
}
// Calls checkResponses for all streams in slice
checkAllStreamResponses(ms_ctx, &config.managedStreamSlice, false, &config.mutex, config.exactlyOnce, id)
// Checks for need to dynamically scale
createNewStreamDynamicScaling(&config)
// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))
var binaryData [][]byte
var currsize int
// Keeps track of the number of rows previously sent
var rowCounter int64
// Find stream with least number of awaiting queue responses
config.mutex.Lock()
leastLoadedStreamIndex := getLeastLoadedStream(config.managedStreamSlice)
config.mutex.Unlock()
// Iterate Records
for {
// Extract Record
ret, _, record := output.GetRecord(dec)
if ret != 0 {
break
}
rowJSONMap := parseMap(record)
// Serialize data
// Transform each row of data into binary using the jsonToBinary function and the message descriptor from the getDescriptors function
buf, err := jsonToBinary(config.messageDescriptor, rowJSONMap)
if err != nil {
log.Printf("Transforming row with value:%s from JSON to binary data for output instance with id: %d failed in FLBPluginFlushCtx: %s", rowJSONMap, id, err)
} else {
// Successful data transformation
if (currsize + len(buf)) >= config.maxChunkSize {
// Appending Rows
err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex)
if err != nil {
log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err)
} else if config.exactlyOnce {
config.mutex.Lock()
(*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter
config.mutex.Unlock()
}
rowCounter = 0
binaryData = nil
currsize = 0
}
binaryData = append(binaryData, buf)
// Include the protobuf overhead to the currsize variable
currsize += (len(buf) + 2)
rowCounter++
}
}
// Appending Rows
err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex)
if err != nil {
log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err)
} else if config.exactlyOnce {
config.mutex.Lock()
(*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter
config.mutex.Unlock()
}
return output.FLB_OK
}