out_writeapi.go (593 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "C" "context" "encoding/json" "errors" "fmt" "log" "math" "strconv" "sync" "time" "unsafe" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/bigquery/storage/managedwriter" "cloud.google.com/go/bigquery/storage/managedwriter/adapt" "github.com/fluent/fluent-bit-go/output" "github.com/googleapis/gax-go/v2" "github.com/googleapis/gax-go/v2/apierror" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/dynamicpb" ) // Struct for each stream - can be multiple per output type streamConfig struct { managedstream MWManagedStream appendResults *[]*managedwriter.AppendResult offsetCounter int64 } // Struct for each instance - one per output type outputConfig struct { messageDescriptor protoreflect.MessageDescriptor streamType managedwriter.StreamType currProjectID string tableRef string schemaDesc *descriptorpb.DescriptorProto enableRetry bool maxQueueBytes int maxQueueRequests int managedStreamSlice *[]*streamConfig client ManagedWriterClient maxChunkSize int mutex sync.Mutex exactlyOnce bool requestCountThreshold int numRetries int } var ( ms_ctx = context.Background() configMap = make(map[int]*outputConfig) configID = 0 ) const ( chunkSizeLimit = 9 * 1024 * 1024 queueRequestDefault = 1000 queueByteDefault = 100 * 1024 * 1024 exactlyOnceDefault = false queueRequestScalingPercent = 0.8 numRetriesDefault = 4 maxNumStreamsPerInstance = 10 minQueueRequests = 10 dateTimeDefault = true ) // This function mangles the top-level and complex (struct) BigQuery schema to convert NUMERIC, BIGNUMERIC, DATETIME, TIME, and JSON fields to STRING. func mangleInputSchema(input *storagepb.TableSchema, dataTimeString bool) *storagepb.TableSchema { if input == nil { return nil } // Create a clone of the table schema newMsg := proto.Clone(input).(*storagepb.TableSchema) newMsg.Fields = make([]*storagepb.TableFieldSchema, len(input.GetFields())) for k, f := range input.GetFields() { // Create a clone of the field newF := proto.Clone(f).(*storagepb.TableFieldSchema) switch newF.GetType() { // Overwrite the field to be string case storagepb.TableFieldSchema_NUMERIC, storagepb.TableFieldSchema_BIGNUMERIC, storagepb.TableFieldSchema_TIME, storagepb.TableFieldSchema_JSON: newF.Type = storagepb.TableFieldSchema_STRING } // If datatimestring is true, then set the field type to string if newF.GetType() == storagepb.TableFieldSchema_DATETIME && dataTimeString { newF.Type = storagepb.TableFieldSchema_STRING } // If the field is a struct type it will have a non-zero number of fields if len(newF.GetFields()) > 0 { // Call mangeInputSchema on the fields in the struct newF.Fields = mangleInputSchema(&storagepb.TableSchema{Fields: newF.Fields}, dataTimeString).Fields } newMsg.Fields[k] = newF } return newMsg } // This function handles getting data on the schema of the table data is being written to. // The getDescriptors function returns the message descriptor (which describes the schema of the corresponding table) as well as a descriptor proto func getDescriptors(curr_ctx context.Context, mw_client ManagedWriterClient, project string, dataset string, table string, dataTimeString bool) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto, error) { // Create streamID specific to the project, dataset, and table curr_stream := fmt.Sprintf("projects/%s/datasets/%s/tables/%s/streams/_default", project, dataset, table) // Create the getwritestreamrequest to have View_FULL so that the schema can be obtained req := storagepb.GetWriteStreamRequest{ Name: curr_stream, View: storagepb.WriteStreamView_FULL, } // Call getwritestream to get data on the table table_data, err := mw_client.GetWriteStream(curr_ctx, &req) if err != nil { return nil, nil, err } // Get the schema from table data init_table_schema := table_data.GetTableSchema() table_schema := mangleInputSchema(init_table_schema, dataTimeString) // Storage schema -> proto descriptor descriptor, err := adapt.StorageSchemaToProto2Descriptor(table_schema, "root") if err != nil { return nil, nil, err } // Proto descriptor -> message descriptor messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) if !ok { return nil, nil, errors.New("Message descriptor could not be created from table's proto descriptor") } // Message descriptor -> descriptor proto dp, err := adapt.NormalizeDescriptor(messageDescriptor) if err != nil { return nil, nil, err } return messageDescriptor, dp, nil } // This function handles the data transformation from JSON to binary for a single json row. // The outputs of this function are the corresponding binary data as well as any error that occur. func jsonToBinary(message_descriptor protoreflect.MessageDescriptor, jsonRow map[string]interface{}) ([]byte, error) { // JSON map -> JSON byte row, err := json.Marshal(jsonRow) if err != nil { return nil, err } // Create empty message message := dynamicpb.NewMessage(message_descriptor) // First, json byte -> proto message err = protojson.Unmarshal(row, message) if err != nil { return nil, err } // Then, proto message -> bytes. b, err := proto.Marshal(message) if err != nil { return nil, err } return b, nil } // From https://github.com/majst01/fluent-bit-go-redis-output.git // Function is used to transform fluent-bit record to a JSON map func parseMap(mapInterface map[interface{}]interface{}) map[string]interface{} { m := make(map[string]interface{}) for k, v := range mapInterface { switch t := v.(type) { case []byte: // Prevent encoding to base64 m[k.(string)] = string(t) case map[interface{}]interface{}: m[k.(string)] = parseMap(t) default: m[k.(string)] = v } } return m } var isReady = func(result *managedwriter.AppendResult) bool { select { case <-result.Ready(): return true default: return false } } var pluginGetResult = func(result *managedwriter.AppendResult, ctx context.Context) (int64, error) { return result.GetResult(ctx) } // This function is used for asynchronous WriteAPI response checking // It takes in the relevant queue of responses as well as boolean that indicates whether we should block the AppendRows function // And wait for the next ready response from WriteAPI // This function returns an int which is the length of the queue after being checked or -1 if an error occured func checkResponses(curr_ctx context.Context, streamSlice *[]*streamConfig, waitForResponse bool, exactlyOnceConf bool, id int, streamIndex int) int { currQueuePointer := (*streamSlice)[streamIndex].appendResults for len(*currQueuePointer) > 0 { if exactlyOnceConf { log.Printf("Asynchronous response queue has non-zero size when exactly-once is configured") break } queueHead := (*currQueuePointer)[0] if waitForResponse || isReady(queueHead) { _, err := pluginGetResult(queueHead, curr_ctx) *currQueuePointer = (*currQueuePointer)[1:] if err != nil { log.Printf("Encountered error:%s while verifying the server response to a data append for output instance with id: %d", err, id) } } else { break } } return len(*currQueuePointer) } // This function checks the responses for all streams in the slice for each instance func checkAllStreamResponses(ctx context.Context, streamSlice **[]*streamConfig, waitForResponse bool, currMutex *sync.Mutex, exactlyOnceConf bool, id int) { (*currMutex).Lock() defer (*currMutex).Unlock() sliceLen := len(**streamSlice) for i := 0; i < sliceLen; i++ { checkResponses(ctx, *streamSlice, waitForResponse, exactlyOnceConf, id, i) } } // This function gets the value of various configuration fields and returns an error if the field could not be parsed func getConfigField[T int | bool](plugin unsafe.Pointer, key string, defaultval T) (T, error) { currstr := output.FLBPluginConfigKey(plugin, key) finval := defaultval if currstr != "" { switch any(defaultval).(type) { case int: intval, err := strconv.Atoi(currstr) if err != nil { return defaultval, err } else { finval = any(intval).(T) } case bool: boolval, err := strconv.ParseBool(currstr) if err != nil { return defaultval, err } else { finval = any(boolval).(T) } } } return finval, nil } // This function creates a new managed stream based on the config struct fields func buildStream(ctx context.Context, config **outputConfig, streamIndex int) error { currManagedStream, err := getWriter((*config).client, ctx, (*config).currProjectID, managedwriter.WithType((*config).streamType), managedwriter.WithDestinationTable((*config).tableRef), // Use the descriptor proto when creating the new managed stream managedwriter.WithSchemaDescriptor((*config).schemaDesc), managedwriter.EnableWriteRetries((*config).enableRetry), managedwriter.WithMaxInflightBytes((*config).maxQueueBytes), managedwriter.WithMaxInflightRequests((*config).maxQueueRequests), managedwriter.WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), managedwriter.WithTraceID("FluentBit"), ) streamSlice := *(*config).managedStreamSlice if err == nil { (streamSlice)[streamIndex].managedstream = currManagedStream } return err } // This function returns whether or not the response indicates an invalid stream func rebuildPredicate(err error) bool { if apiErr, ok := apierror.FromError(err); ok { storageErr := &storagepb.StorageError{} if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil { return storageErr.GetCode() == storagepb.StorageError_INVALID_STREAM_TYPE } } return false } // This function sends and checks the responses for data through a committed stream with exactly once functionality func sendRequestExactlyOnce(ctx context.Context, data [][]byte, config **outputConfig, streamIndex int) error { (*config).mutex.Lock() defer (*config).mutex.Unlock() currStream := (*(*config).managedStreamSlice)[streamIndex] appendResult, err := currStream.managedstream.AppendRows(ctx, data, managedwriter.WithOffset(currStream.offsetCounter)) if err != nil { return err } // Synchronously check the response immediately after appending data with exactly once semantics _, err = pluginGetResult(appendResult, ctx) if err != nil { return err } return nil } // This function enables synchronous retries and rebuilding a valid stream based on the server response func sendRequestRetries(ctx context.Context, data [][]byte, config **outputConfig, streamIndex int) error { retryer := newStatelessRetryer((*config).numRetries) attempt := 0 currStream := (*(*config).managedStreamSlice)[streamIndex] for { err := sendRequestExactlyOnce(ctx, data, config, streamIndex) if err == nil { break } // Unsuccesful data append if rebuildPredicate(err) { currStream.managedstream.Finalize(ctx) currStream.managedstream.Close() // Rebuild stream err := buildStream(ctx, config, streamIndex) if err != nil { return err } // Retry sending data without incrementing number of attempts or waiting between attempts } else { backoffPeriod, shouldRetry := retryer.Retry(err, attempt) if !shouldRetry { return err } // Retry sending data after incrementing attempt count and waiting for designated amount of time attempt++ time.Sleep(backoffPeriod) } } return nil } // This function sends data and appends the responses to a queue to be checked asynchronously through a default stream with at least once functionality func sendRequestDefault(ctx context.Context, data [][]byte, config **outputConfig, streamIndex int) error { (*config).mutex.Lock() defer (*config).mutex.Unlock() currStream := (*(*config).managedStreamSlice)[streamIndex] appendResult, err := currStream.managedstream.AppendRows(ctx, data) if err != nil { return err } *currStream.appendResults = append(*currStream.appendResults, appendResult) return nil } // This function cases on the exactly/at-least once functionality and sends the data accordingly func sendRequest(ctx context.Context, data [][]byte, config **outputConfig, streamIndex int) error { if len(data) > 0 { if (*config).exactlyOnce { return sendRequestRetries(ctx, data, config, streamIndex) } else { return sendRequestDefault(ctx, data, config, streamIndex) } } return nil } // This is a test-only method that provides the instance count for configMap func getInstanceCount() int { return len(configMap) } // Finds the stream index when dynamically scaling func getLeastLoadedStream(streamSlice *[]*streamConfig) int { min := len(*(*streamSlice)[0].appendResults) minStreamIndex := 0 for streamIndex, stream := range *streamSlice { if len(*stream.appendResults) < min { min = len(*stream.appendResults) minStreamIndex = streamIndex } } return minStreamIndex } // This is a test-only method which takes in a config id and returns the current offset value of the struct corresponding to the id func getOffset(id int) int64 { config := configMap[id] streamSlice := *config.managedStreamSlice return streamSlice[0].offsetCounter } // Method to determine threshold var setThreshold = func(maxQueueSize int) int { requestCountThreshold := int(math.Floor(queueRequestScalingPercent * float64(maxQueueSize))) if requestCountThreshold < minQueueRequests { requestCountThreshold = minQueueRequests } return requestCountThreshold } // This function check whether there is room for scaling and the scales the number of stream dynamically depending on if it // Detects back pressure from the queue func createNewStreamDynamicScaling(config **outputConfig) { (*config).mutex.Lock() defer (*config).mutex.Unlock() if len(*(*config).managedStreamSlice) < maxNumStreamsPerInstance { // Gets stream with least values in queue mostEfficient := getLeastLoadedStream((*config).managedStreamSlice) mostEfficientQueueLength := len(*(*(*config).managedStreamSlice)[mostEfficient].appendResults) var newResQueue []*managedwriter.AppendResult var newStream = streamConfig{ offsetCounter: 0, appendResults: &newResQueue, } if mostEfficientQueueLength > (*config).requestCountThreshold { *(*config).managedStreamSlice = append(*(*config).managedStreamSlice, &newStream) newStreamIndex := len(*(*config).managedStreamSlice) - 1 err := buildStream(ms_ctx, config, newStreamIndex) if err != nil { log.Printf("Creating an additional managed stream with destination table: %s failed in FLBPluginInit: %s", (*config).tableRef, err) // If failure, failed stream is removed from slice *(*config).managedStreamSlice = (*(*config).managedStreamSlice)[:newStreamIndex] } } } } // This interface acts as a wrapper for the *managedwriter.Client type which the realManagedWriterClient struct implements // With its actual methods. type ManagedWriterClient interface { NewManagedStream(ctx context.Context, opts ...managedwriter.WriterOption) (*managedwriter.ManagedStream, error) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) Close() error BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) } type realManagedWriterClient struct { currClient *managedwriter.Client } func (r *realManagedWriterClient) NewManagedStream(ctx context.Context, opts ...managedwriter.WriterOption) (*managedwriter.ManagedStream, error) { return r.currClient.NewManagedStream(ctx, opts...) } func (r *realManagedWriterClient) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) { return r.currClient.GetWriteStream(ctx, req, opts...) } func (r *realManagedWriterClient) Close() error { return r.currClient.Close() } func (r *realManagedWriterClient) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) { return r.currClient.BatchCommitWriteStreams(ctx, req, opts...) } func (r *realManagedWriterClient) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) { return r.currClient.CreateWriteStream(ctx, req, opts...) } // This function acts as a wrapper for the managedwriter.NewClient function in order to inject a mock interface, one can // Override the getClient method to return a different struct type (that still implements the managedwriterclient interface) var getClient = func(ctx context.Context, projectID string) (ManagedWriterClient, error) { client, err := managedwriter.NewClient(ctx, projectID) if err != nil { return nil, err } return &realManagedWriterClient{currClient: client}, nil } type MWManagedStream interface { AppendRows(ctx context.Context, data [][]byte, opts ...managedwriter.AppendOption) (*managedwriter.AppendResult, error) Close() error Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) StreamName() string } // To inject a mock interface, we can override getWriter and getContext var getWriter = func(client ManagedWriterClient, ctx context.Context, projectID string, opts ...managedwriter.WriterOption) (MWManagedStream, error) { return client.NewManagedStream(ctx, opts...) } // This function acts as a wrapper for the GetContext function so that we may override it to // Mock it whenever needed var getFLBPluginContext = func(ctx unsafe.Pointer) int { return output.FLBPluginGetContext(ctx).(int) } // Finalizes and Closes all streams in slice for a given instance func finalizeCloseAllStreams(config **outputConfig, id int) bool { (*config).mutex.Lock() defer (*config).mutex.Unlock() errFlag := false streamSlice := (*config).managedStreamSlice for i := 0; i < len(*(*config).managedStreamSlice); i++ { if (*streamSlice)[i].managedstream != nil { if (*config).exactlyOnce { if _, err := (*streamSlice)[i].managedstream.Finalize(ms_ctx); err != nil { log.Printf("Finalizing managed stream for output instance with id %d and stream index %d failed in FLBPluginExit: %s", id, i, err) errFlag = true } } if err := (*streamSlice)[i].managedstream.Close(); err != nil { log.Printf("Closing managed stream for output instance with id %d and stream index %d failed in FLBPluginExitCtx: %s", id, i, err) errFlag = true } } } return errFlag } //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { return output.FLBPluginRegister(def, "writeapi", "Sends data to BigQuery through WriteAPI") } //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { // Set projectID, datasetID, and tableID from config file params projectID := output.FLBPluginConfigKey(plugin, "ProjectID") datasetID := output.FLBPluginConfigKey(plugin, "DatasetID") tableID := output.FLBPluginConfigKey(plugin, "TableID") // Set exactly-once bool from config file param exactlyOnceVal, err := getConfigField(plugin, "Exactly_Once", exactlyOnceDefault) if err != nil { log.Printf("Invalid Exactly_Once parameter in configuration file: %s", err) return output.FLB_ERROR } // Optional num synchronous retries parameter // This value is only used when the exactly-once field is configured to true (as it describes synchronous retries) numRetriesVal, err := getConfigField(plugin, "Num_Synchronous_Retries", numRetriesDefault) if err != nil { log.Printf("Invalid Num_Synchronous_Retries parameter in configuration file: %s", err) return output.FLB_ERROR } // Optional maxchunksize param maxChunkSize_init, err := getConfigField(plugin, "Max_Chunk_Size", chunkSizeLimit) if err != nil { log.Printf("Invalid Max_Chunk_Size parameter in configuration file: %s", err) return output.FLB_ERROR } if maxChunkSize_init > chunkSizeLimit { log.Printf("Max_Chunk_Size was set to: %d, but a single call to AppendRows cannot exceed 9 MB. Defaulting to 9 MB", maxChunkSize_init) maxChunkSize_init = chunkSizeLimit } // Optional max queue size params maxQueueSize, err := getConfigField(plugin, "Max_Queue_Requests", queueRequestDefault) if err != nil { log.Printf("Invalid Max_Queue_Requests parameter in configuration file: %s", err) return output.FLB_ERROR } // Multiply floats, floor it, then convert it to integer for ease of use in Flush requestCountThreshold := setThreshold(maxQueueSize) maxQueueByteSize, err := getConfigField(plugin, "Max_Queue_Bytes", queueByteDefault) if err != nil { log.Printf("Invalid Max_Queue_Bytes parameter in configuration file: %s", err) return output.FLB_ERROR } dateTimeStringType, err := getConfigField(plugin, "DateTime_String_Type", dateTimeDefault) if err != nil { log.Printf("Invalid DateTime_Input_Type parameter in configuration file: %s", err) return output.FLB_ERROR } // Create new client client, err := getClient(ms_ctx, projectID) if err != nil { log.Printf("Creating a new managed BigQuery Storage write client scoped to: %s failed in FLBPluginInit: %s", projectID, err) return output.FLB_ERROR } // Create stream name tableReference := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID) // Call getDescriptors to get the message descriptor, and descriptor proto md, descriptor, err := getDescriptors(ms_ctx, client, projectID, datasetID, tableID, dateTimeStringType) if err != nil { log.Printf("Getting message descriptor and descriptor proto for table: %s failed in FLBPluginInit: %s", tableReference, err) return output.FLB_ERROR } // Set the stream type based on exactly once parameter var currStreamType managedwriter.StreamType var enableRetries bool if exactlyOnceVal { currStreamType = managedwriter.CommittedStream } else { currStreamType = managedwriter.DefaultStream enableRetries = true } var res_temp []*managedwriter.AppendResult streamSlice := []*streamConfig{} // Creates struct for stream and appends to slice initStream := streamConfig{ offsetCounter: 0, appendResults: &res_temp, } streamSlice = append(streamSlice, &initStream) // Instantiates output instance config := outputConfig{ messageDescriptor: md, streamType: currStreamType, tableRef: tableReference, currProjectID: projectID, schemaDesc: descriptor, enableRetry: enableRetries, maxQueueBytes: maxQueueByteSize, maxQueueRequests: maxQueueSize, client: client, maxChunkSize: maxChunkSize_init, exactlyOnce: exactlyOnceVal, numRetries: numRetriesVal, requestCountThreshold: requestCountThreshold, managedStreamSlice: &streamSlice, } // Create stream using NewManagedStream configPointer := &config err = buildStream(ms_ctx, &configPointer, 0) if err != nil { log.Printf("Creating a new managed stream with destination table: %s failed in FLBPluginInit: %s", tableReference, err) return output.FLB_ERROR } configMap[configID] = &config // Creating FLB context for each output, enables multiinstancing config.mutex.Lock() output.FLBPluginSetContext(plugin, configID) configID = configID + 1 config.mutex.Unlock() return output.FLB_OK } //export FLBPluginFlush func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { log.Print("[multiinstance] Flush called for unknown instance") return output.FLB_OK } //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { id := getFLBPluginContext(ctx) // Locate stream in map // Look up through reference config, ok := configMap[id] if !ok { log.Printf("Finding configuration for output instance with id: %d failed in FLBPluginFlushCtx", id) return output.FLB_ERROR } // Calls checkResponses for all streams in slice checkAllStreamResponses(ms_ctx, &config.managedStreamSlice, false, &config.mutex, config.exactlyOnce, id) // Checks for need to dynamically scale createNewStreamDynamicScaling(&config) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) var binaryData [][]byte var currsize int // Keeps track of the number of rows previously sent var rowCounter int64 // Find stream with least number of awaiting queue responses config.mutex.Lock() leastLoadedStreamIndex := getLeastLoadedStream(config.managedStreamSlice) config.mutex.Unlock() // Iterate Records for { // Extract Record ret, _, record := output.GetRecord(dec) if ret != 0 { break } rowJSONMap := parseMap(record) // Serialize data // Transform each row of data into binary using the jsonToBinary function and the message descriptor from the getDescriptors function buf, err := jsonToBinary(config.messageDescriptor, rowJSONMap) if err != nil { log.Printf("Transforming row with value:%s from JSON to binary data for output instance with id: %d failed in FLBPluginFlushCtx: %s", rowJSONMap, id, err) } else { // Successful data transformation if (currsize + len(buf)) >= config.maxChunkSize { // Appending Rows err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex) if err != nil { log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err) } else if config.exactlyOnce { config.mutex.Lock() (*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter config.mutex.Unlock() } rowCounter = 0 binaryData = nil currsize = 0 } binaryData = append(binaryData, buf) // Include the protobuf overhead to the currsize variable currsize += (len(buf) + 2) rowCounter++ } } // Appending Rows err := sendRequest(ms_ctx, binaryData, &config, leastLoadedStreamIndex) if err != nil { log.Printf("Appending data for output instance with id: %d failed in FLBPluginFlushCtx: %s", id, err) } else if config.exactlyOnce { config.mutex.Lock() (*config.managedStreamSlice)[leastLoadedStreamIndex].offsetCounter += rowCounter config.mutex.Unlock() } return output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { log.Print("[multiinstance] Exit called for unknown instance") return output.FLB_OK } //export FLBPluginExitCtx func FLBPluginExitCtx(ctx unsafe.Pointer) int { // Get context id := output.FLBPluginGetContext(ctx).(int) // Locate stream in map config, ok := configMap[id] if !ok { log.Printf("Finding configuration for output instance with id: %d failed in FLBPluginExitCtx", id) return output.FLB_ERROR } // Calls checkResponses, finalizes, and closes each stream // If there is an error it preserves it and then continues to close and finalize all other streams checkAllStreamResponses(ms_ctx, &config.managedStreamSlice, false, &config.mutex, config.exactlyOnce, id) errFlag := finalizeCloseAllStreams(&config, id) if config.client != nil { if err := config.client.Close(); err != nil { log.Printf("Closing managed writer client for output instance with id: %d failed in FLBPluginExitCtx: %s", id, err) errFlag = true } } if errFlag { return output.FLB_ERROR } return output.FLB_OK } //export FLBPluginUnregister func FLBPluginUnregister(def unsafe.Pointer) { log.Print("[multiinstance] Unregister called") output.FLBPluginUnregister(def) } func main() { }