in out_writeapi.go [427:453]
func createNewStreamDynamicScaling(config **outputConfig) {
(*config).mutex.Lock()
defer (*config).mutex.Unlock()
if len(*(*config).managedStreamSlice) < maxNumStreamsPerInstance {
// Gets stream with least values in queue
mostEfficient := getLeastLoadedStream((*config).managedStreamSlice)
mostEfficientQueueLength := len(*(*(*config).managedStreamSlice)[mostEfficient].appendResults)
var newResQueue []*managedwriter.AppendResult
var newStream = streamConfig{
offsetCounter: 0,
appendResults: &newResQueue,
}
if mostEfficientQueueLength > (*config).requestCountThreshold {
*(*config).managedStreamSlice = append(*(*config).managedStreamSlice, &newStream)
newStreamIndex := len(*(*config).managedStreamSlice) - 1
err := buildStream(ms_ctx, config, newStreamIndex)
if err != nil {
log.Printf("Creating an additional managed stream with destination table: %s failed in FLBPluginInit: %s", (*config).tableRef, err)
// If failure, failed stream is removed from slice
*(*config).managedStreamSlice = (*(*config).managedStreamSlice)[:newStreamIndex]
}
}
}
}