func createNewStreamDynamicScaling()

in out_writeapi.go [427:453]


func createNewStreamDynamicScaling(config **outputConfig) {
	(*config).mutex.Lock()
	defer (*config).mutex.Unlock()
	if len(*(*config).managedStreamSlice) < maxNumStreamsPerInstance {
		// Gets stream with least values in queue
		mostEfficient := getLeastLoadedStream((*config).managedStreamSlice)
		mostEfficientQueueLength := len(*(*(*config).managedStreamSlice)[mostEfficient].appendResults)

		var newResQueue []*managedwriter.AppendResult
		var newStream = streamConfig{
			offsetCounter: 0,
			appendResults: &newResQueue,
		}

		if mostEfficientQueueLength > (*config).requestCountThreshold {
			*(*config).managedStreamSlice = append(*(*config).managedStreamSlice, &newStream)
			newStreamIndex := len(*(*config).managedStreamSlice) - 1
			err := buildStream(ms_ctx, config, newStreamIndex)
			if err != nil {
				log.Printf("Creating an additional managed stream with destination table: %s failed in FLBPluginInit: %s", (*config).tableRef, err)
				// If failure, failed stream is removed from slice
				*(*config).managedStreamSlice = (*(*config).managedStreamSlice)[:newStreamIndex]
			}

		}
	}
}