producer/io_thread_pool.go (50 lines of code) (raw):

package producer import ( "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "go.uber.org/atomic" ) type IoThreadPool struct { threadPoolShutDownFlag *atomic.Bool taskCh chan *ProducerBatch ioworker *IoWorker logger log.Logger stopped *atomic.Bool } func initIoThreadPool(ioworker *IoWorker, logger log.Logger) *IoThreadPool { return &IoThreadPool{ threadPoolShutDownFlag: atomic.NewBool(false), taskCh: make(chan *ProducerBatch, 100000), ioworker: ioworker, logger: logger, stopped: atomic.NewBool(false), } } func (threadPool *IoThreadPool) addTask(batch *ProducerBatch) { threadPool.taskCh <- batch } func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThreadPoolwait *sync.WaitGroup) { defer ioThreadPoolwait.Done() for task := range threadPool.taskCh { if task == nil { level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent") threadPool.stopped.Store(true) return } threadPool.ioworker.startSendTask(ioWorkerWaitGroup) go func(producerBatch *ProducerBatch) { defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup) threadPool.ioworker.sendToServer(producerBatch) }(task) } } func (threadPool *IoThreadPool) ShutDown() { old := threadPool.threadPoolShutDownFlag.Swap(true) if !old { close(threadPool.taskCh) } } func (threadPool *IoThreadPool) Stopped() bool { return threadPool.stopped.Load() }