in out_writeapi.go [548:672]
func FLBPluginInit(plugin unsafe.Pointer) int {
// Set projectID, datasetID, and tableID from config file params
projectID := output.FLBPluginConfigKey(plugin, "ProjectID")
datasetID := output.FLBPluginConfigKey(plugin, "DatasetID")
tableID := output.FLBPluginConfigKey(plugin, "TableID")
// Set exactly-once bool from config file param
exactlyOnceVal, err := getConfigField(plugin, "Exactly_Once", exactlyOnceDefault)
if err != nil {
log.Printf("Invalid Exactly_Once parameter in configuration file: %s", err)
return output.FLB_ERROR
}
// Optional num synchronous retries parameter
// This value is only used when the exactly-once field is configured to true (as it describes synchronous retries)
numRetriesVal, err := getConfigField(plugin, "Num_Synchronous_Retries", numRetriesDefault)
if err != nil {
log.Printf("Invalid Num_Synchronous_Retries parameter in configuration file: %s", err)
return output.FLB_ERROR
}
// Optional maxchunksize param
maxChunkSize_init, err := getConfigField(plugin, "Max_Chunk_Size", chunkSizeLimit)
if err != nil {
log.Printf("Invalid Max_Chunk_Size parameter in configuration file: %s", err)
return output.FLB_ERROR
}
if maxChunkSize_init > chunkSizeLimit {
log.Printf("Max_Chunk_Size was set to: %d, but a single call to AppendRows cannot exceed 9 MB. Defaulting to 9 MB", maxChunkSize_init)
maxChunkSize_init = chunkSizeLimit
}
// Optional max queue size params
maxQueueSize, err := getConfigField(plugin, "Max_Queue_Requests", queueRequestDefault)
if err != nil {
log.Printf("Invalid Max_Queue_Requests parameter in configuration file: %s", err)
return output.FLB_ERROR
}
// Multiply floats, floor it, then convert it to integer for ease of use in Flush
requestCountThreshold := setThreshold(maxQueueSize)
maxQueueByteSize, err := getConfigField(plugin, "Max_Queue_Bytes", queueByteDefault)
if err != nil {
log.Printf("Invalid Max_Queue_Bytes parameter in configuration file: %s", err)
return output.FLB_ERROR
}
dateTimeStringType, err := getConfigField(plugin, "DateTime_String_Type", dateTimeDefault)
if err != nil {
log.Printf("Invalid DateTime_Input_Type parameter in configuration file: %s", err)
return output.FLB_ERROR
}
// Create new client
client, err := getClient(ms_ctx, projectID)
if err != nil {
log.Printf("Creating a new managed BigQuery Storage write client scoped to: %s failed in FLBPluginInit: %s", projectID, err)
return output.FLB_ERROR
}
// Create stream name
tableReference := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
// Call getDescriptors to get the message descriptor, and descriptor proto
md, descriptor, err := getDescriptors(ms_ctx, client, projectID, datasetID, tableID, dateTimeStringType)
if err != nil {
log.Printf("Getting message descriptor and descriptor proto for table: %s failed in FLBPluginInit: %s", tableReference, err)
return output.FLB_ERROR
}
// Set the stream type based on exactly once parameter
var currStreamType managedwriter.StreamType
var enableRetries bool
if exactlyOnceVal {
currStreamType = managedwriter.CommittedStream
} else {
currStreamType = managedwriter.DefaultStream
enableRetries = true
}
var res_temp []*managedwriter.AppendResult
streamSlice := []*streamConfig{}
// Creates struct for stream and appends to slice
initStream := streamConfig{
offsetCounter: 0,
appendResults: &res_temp,
}
streamSlice = append(streamSlice, &initStream)
// Instantiates output instance
config := outputConfig{
messageDescriptor: md,
streamType: currStreamType,
tableRef: tableReference,
currProjectID: projectID,
schemaDesc: descriptor,
enableRetry: enableRetries,
maxQueueBytes: maxQueueByteSize,
maxQueueRequests: maxQueueSize,
client: client,
maxChunkSize: maxChunkSize_init,
exactlyOnce: exactlyOnceVal,
numRetries: numRetriesVal,
requestCountThreshold: requestCountThreshold,
managedStreamSlice: &streamSlice,
}
// Create stream using NewManagedStream
configPointer := &config
err = buildStream(ms_ctx, &configPointer, 0)
if err != nil {
log.Printf("Creating a new managed stream with destination table: %s failed in FLBPluginInit: %s", tableReference, err)
return output.FLB_ERROR
}
configMap[configID] = &config
// Creating FLB context for each output, enables multiinstancing
config.mutex.Lock()
output.FLBPluginSetContext(plugin, configID)
configID = configID + 1
config.mutex.Unlock()
return output.FLB_OK
}