func FLBPluginFlushCtx()

in out_writeapi.go [681:759]


func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	id := getFLBPluginContext(ctx)
	// Locate stream in map
	// Look up through reference
	config, ok := configMap[id]
	if !ok {
		log.Printf("Finding configuration for output instance with id: %d failed in FLBPluginFlushCtx", id)
		return output.FLB_ERROR
	}

	// Calls checkResponses for all streams in slice
	checkAllStreamResponses(ms_ctx, &config.managedStreamSlice, false, &config.mutex, config.exactlyOnce, id)
	// Checks for need to dynamically scale
	createNewStreamDynamicScaling(&config)

	// Create Fluent Bit decoder
	dec := output.NewDecoder(data, int(length))
	var binaryData [][]byte
	var currsize int
	// Keeps track of the number of rows previously sent
	var rowCounter int64

	// Find stream with least number of awaiting queue responses
	config.mutex.Lock()
	leastLoadedStreamIndex := getLeastLoadedStream(config.managedStreamSlice)
	config.mutex.Unlock()

	// Iterate Records
	for {
		// Extract Record
		ret, _, record := output.GetRecord(dec)
		if ret != 0 {
			break
		}

		rowJSONMap := parseMap(record)

		// Serialize data
		// Transform each row of data into binary using the jsonToBinary function and the message descriptor from the getDescriptors function
		buf, err := jsonToBinary(config.messageDescriptor, rowJSONMap)
		if err != nil {
			log.Printf("Transforming row with value:%s from JSON to binary data for output instance with id: %d failed in FLBPluginFlushCtx: %s", rowJSONMap, id, err)
		} else {
			// Successful data transformation
			if (currsize + len(buf)) >= config.maxChunkSize {
				// Appending Rows
				err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex)
				if err != nil {
					log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err)
				} else if config.exactlyOnce {
					config.mutex.Lock()
					(*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter
					config.mutex.Unlock()
				}

				rowCounter = 0

				binaryData = nil
				currsize = 0

			}
			binaryData = append(binaryData, buf)
			// Include the protobuf overhead to the currsize variable
			currsize += (len(buf) + 2)
			rowCounter++
		}
	}
	// Appending Rows
	err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex)
	if err != nil {
		log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err)
	} else if config.exactlyOnce {
		config.mutex.Lock()
		(*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter
		config.mutex.Unlock()
	}

	return output.FLB_OK
}