func FLBPluginInit()

in out_writeapi.go [548:672]


func FLBPluginInit(plugin unsafe.Pointer) int {
	// Set projectID, datasetID, and tableID from config file params
	projectID := output.FLBPluginConfigKey(plugin, "ProjectID")
	datasetID := output.FLBPluginConfigKey(plugin, "DatasetID")
	tableID := output.FLBPluginConfigKey(plugin, "TableID")

	// Set exactly-once bool from config file param
	exactlyOnceVal, err := getConfigField(plugin, "Exactly_Once", exactlyOnceDefault)
	if err != nil {
		log.Printf("Invalid Exactly_Once parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}

	// Optional num synchronous retries parameter
	// This value is only used when the exactly-once field is configured to true (as it describes synchronous retries)
	numRetriesVal, err := getConfigField(plugin, "Num_Synchronous_Retries", numRetriesDefault)
	if err != nil {
		log.Printf("Invalid Num_Synchronous_Retries parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}

	// Optional maxchunksize param
	maxChunkSize_init, err := getConfigField(plugin, "Max_Chunk_Size", chunkSizeLimit)
	if err != nil {
		log.Printf("Invalid Max_Chunk_Size parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}
	if maxChunkSize_init > chunkSizeLimit {
		log.Printf("Max_Chunk_Size was set to: %d, but a single call to AppendRows cannot exceed 9 MB. Defaulting to 9 MB", maxChunkSize_init)
		maxChunkSize_init = chunkSizeLimit
	}

	// Optional max queue size params
	maxQueueSize, err := getConfigField(plugin, "Max_Queue_Requests", queueRequestDefault)
	if err != nil {
		log.Printf("Invalid Max_Queue_Requests parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}
	// Multiply floats, floor it, then convert it to integer for ease of use in Flush
	requestCountThreshold := setThreshold(maxQueueSize)
	maxQueueByteSize, err := getConfigField(plugin, "Max_Queue_Bytes", queueByteDefault)
	if err != nil {
		log.Printf("Invalid Max_Queue_Bytes parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}

	dateTimeStringType, err := getConfigField(plugin, "DateTime_String_Type", dateTimeDefault)
	if err != nil {
		log.Printf("Invalid DateTime_Input_Type parameter in configuration file: %s", err)
		return output.FLB_ERROR
	}

	// Create new client
	client, err := getClient(ms_ctx, projectID)
	if err != nil {
		log.Printf("Creating a new managed BigQuery Storage write client scoped to: %s failed in FLBPluginInit: %s", projectID, err)
		return output.FLB_ERROR
	}

	// Create stream name
	tableReference := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)

	// Call getDescriptors to get the message descriptor, and descriptor proto
	md, descriptor, err := getDescriptors(ms_ctx, client, projectID, datasetID, tableID, dateTimeStringType)
	if err != nil {
		log.Printf("Getting message descriptor and descriptor proto for table: %s failed in FLBPluginInit: %s", tableReference, err)
		return output.FLB_ERROR
	}

	// Set the stream type based on exactly once parameter
	var currStreamType managedwriter.StreamType
	var enableRetries bool
	if exactlyOnceVal {
		currStreamType = managedwriter.CommittedStream
	} else {
		currStreamType = managedwriter.DefaultStream
		enableRetries = true
	}

	var res_temp []*managedwriter.AppendResult
	streamSlice := []*streamConfig{}

	// Creates struct for stream and appends to slice
	initStream := streamConfig{
		offsetCounter: 0,
		appendResults: &res_temp,
	}
	streamSlice = append(streamSlice, &initStream)

	// Instantiates output instance
	config := outputConfig{
		messageDescriptor:     md,
		streamType:            currStreamType,
		tableRef:              tableReference,
		currProjectID:         projectID,
		schemaDesc:            descriptor,
		enableRetry:           enableRetries,
		maxQueueBytes:         maxQueueByteSize,
		maxQueueRequests:      maxQueueSize,
		client:                client,
		maxChunkSize:          maxChunkSize_init,
		exactlyOnce:           exactlyOnceVal,
		numRetries:            numRetriesVal,
		requestCountThreshold: requestCountThreshold,
		managedStreamSlice:    &streamSlice,
	}

	// Create stream using NewManagedStream
	configPointer := &config
	err = buildStream(ms_ctx, &configPointer, 0)
	if err != nil {
		log.Printf("Creating a new managed stream with destination table: %s failed in FLBPluginInit: %s", tableReference, err)
		return output.FLB_ERROR
	}

	configMap[configID] = &config

	// Creating FLB context for each output, enables multiinstancing
	config.mutex.Lock()
	output.FLBPluginSetContext(plugin, configID)
	configID = configID + 1
	config.mutex.Unlock()

	return output.FLB_OK
}