producer/producer.go (288 lines of code) (raw):

package producer import ( "errors" "sync" "sync/atomic" "time" sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ) const ( TimeoutExecption = "TimeoutExecption" IllegalStateException = "IllegalStateException" ) type Producer struct { producerConfig *ProducerConfig logAccumulator *LogAccumulator mover *Mover threadPool *IoThreadPool moverWaitGroup *sync.WaitGroup ioWorkerWaitGroup *sync.WaitGroup ioThreadPoolWaitGroup *sync.WaitGroup buckets int logger log.Logger producerLogGroupSize int64 monitor *ProducerMonitor } func NewProducer(producerConfig *ProducerConfig) (*Producer, error) { logger := getProducerLogger(producerConfig) finalProducerConfig := validateProducerConfig(producerConfig, logger) client, err := createClient(finalProducerConfig, false, logger) if err != nil { return nil, err } return createProducerInternal(client, finalProducerConfig, logger), nil } // Deprecated: use NewProducer instead. func InitProducer(producerConfig *ProducerConfig) *Producer { logger := getProducerLogger(producerConfig) finalProducerConfig := validateProducerConfig(producerConfig, logger) client, _ := createClient(finalProducerConfig, true, logger) return createProducerInternal(client, finalProducerConfig, logger) } func createProducerInternal(client sls.ClientInterface, finalProducerConfig *ProducerConfig, logger log.Logger) *Producer { configureClient(client, finalProducerConfig) retryQueue := initRetryQueue() errorStatusMap := func() map[int]*string { errorCodeMap := map[int]*string{} for _, v := range finalProducerConfig.NoRetryStatusCodeList { errorCodeMap[int(v)] = nil } return errorCodeMap }() producer := &Producer{ producerConfig: finalProducerConfig, buckets: finalProducerConfig.Buckets, } ioWorker := initIoWorker(client, retryQueue, logger, finalProducerConfig.MaxIoWorkerCount, errorStatusMap, producer) threadPool := initIoThreadPool(ioWorker, logger) logAccumulator := initLogAccumulator(finalProducerConfig, ioWorker, logger, threadPool, producer) mover := initMover(logAccumulator, retryQueue, ioWorker, logger, threadPool) producer.logAccumulator = logAccumulator producer.mover = mover producer.threadPool = threadPool producer.moverWaitGroup = &sync.WaitGroup{} producer.ioWorkerWaitGroup = &sync.WaitGroup{} producer.ioThreadPoolWaitGroup = &sync.WaitGroup{} producer.logger = logger producer.monitor = newProducerMonitor() return producer } func configureClient(client sls.ClientInterface, producerConfig *ProducerConfig) { if producerConfig.Region != "" { client.SetRegion(producerConfig.Region) } if producerConfig.AuthVersion != "" { client.SetAuthVersion(producerConfig.AuthVersion) } if producerConfig.HTTPClient != nil { client.SetHTTPClient(producerConfig.HTTPClient) } if producerConfig.UserAgent != "" { client.SetUserAgent(producerConfig.UserAgent) } } func createClient(producerConfig *ProducerConfig, allowStsFallback bool, logger log.Logger) (sls.ClientInterface, error) { // use CredentialsProvider if producerConfig.CredentialsProvider != nil { return sls.CreateNormalInterfaceV2(producerConfig.Endpoint, producerConfig.CredentialsProvider), nil } // use UpdateStsTokenFunc if producerConfig.UpdateStsToken != nil && producerConfig.StsTokenShutDown != nil { client, err := sls.CreateTokenAutoUpdateClient(producerConfig.Endpoint, producerConfig.UpdateStsToken, producerConfig.StsTokenShutDown) if err == nil || !allowStsFallback { return client, err } // for backward compatibility level.Warn(logger).Log("msg", "Failed to create ststoken client, use default client without ststoken.", "error", err) } // fallback to default static long-lived AK staticProvider := sls.NewStaticCredentialsProvider(producerConfig.AccessKeyID, producerConfig.AccessKeySecret, "") return sls.CreateNormalInterfaceV2(producerConfig.Endpoint, staticProvider), nil } func validateProducerConfig(producerConfig *ProducerConfig, logger log.Logger) *ProducerConfig { if producerConfig.MaxReservedAttempts <= 0 { level.Warn(logger).Log("msg", "This MaxReservedAttempts parameter must be greater than zero,program auto correction to default value") producerConfig.MaxReservedAttempts = 11 } if producerConfig.MaxBatchCount > 40960 || producerConfig.MaxBatchCount <= 0 { level.Warn(logger).Log("msg", "The parameter MaxBatchCount exceeds the set maximum and has been reset to the set maximum of 40960.") producerConfig.MaxBatchCount = 40960 } if producerConfig.MaxBatchSize > 1024*1024*5 || producerConfig.MaxBatchSize <= 0 { level.Warn(logger).Log("msg", "The parameter MaxBatchSize exceeds the settable maximum and has reset a single logGroup memory size of up to 5M.") producerConfig.MaxBatchSize = 1024 * 1024 * 5 } if producerConfig.MaxIoWorkerCount <= 0 { level.Warn(logger).Log("msg", "The MaxIoWorkerCount parameter cannot be less than zero and has been reset to the default value of 50") producerConfig.MaxIoWorkerCount = 50 } if producerConfig.BaseRetryBackoffMs <= 0 { level.Warn(logger).Log("msg", "The BaseRetryBackoffMs parameter cannot be less than zero and has been reset to the default value of 100 milliseconds") producerConfig.BaseRetryBackoffMs = 100 } if producerConfig.TotalSizeLnBytes <= 0 { level.Warn(logger).Log("msg", "The TotalSizeLnBytes parameter cannot be less than zero and has been reset to the default value of 100M") producerConfig.TotalSizeLnBytes = 100 * 1024 * 1024 } if producerConfig.LingerMs < 100 { level.Warn(logger).Log("msg", "The LingerMs parameter cannot be less than 100 milliseconds and has been reset to the default value of 2000 milliseconds") producerConfig.LingerMs = 2000 } return producerConfig } func (producer *Producer) HashSendLogWithCallBack(project, logstore, shardHash, topic, source string, log *sls.Log, callback CallBack) error { err := producer.waitTime() if err != nil { return err } if producer.producerConfig.AdjustShargHash { shardHash, err = AdjustHash(shardHash, producer.buckets) if err != nil { return err } } return producer.logAccumulator.addLogToProducerBatch(project, logstore, shardHash, topic, source, log, callback) } func (producer *Producer) HashSendLogListWithCallBack(project, logstore, shardHash, topic, source string, logList []*sls.Log, callback CallBack) (err error) { err = producer.waitTime() if err != nil { return err } if producer.producerConfig.AdjustShargHash { shardHash, err = AdjustHash(shardHash, producer.buckets) if err != nil { return err } } return producer.logAccumulator.addLogToProducerBatch(project, logstore, shardHash, topic, source, logList, callback) } func (producer *Producer) SendLog(project, logstore, topic, source string, log *sls.Log) error { err := producer.waitTime() if err != nil { return err } return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, log, nil) } func (producer *Producer) SendLogList(project, logstore, topic, source string, logList []*sls.Log) (err error) { err = producer.waitTime() if err != nil { return err } return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, logList, nil) } func (producer *Producer) HashSendLog(project, logstore, shardHash, topic, source string, log *sls.Log) error { err := producer.waitTime() if err != nil { return err } if producer.producerConfig.AdjustShargHash { shardHash, err = AdjustHash(shardHash, producer.buckets) if err != nil { return err } } return producer.logAccumulator.addLogToProducerBatch(project, logstore, shardHash, topic, source, log, nil) } func (producer *Producer) HashSendLogList(project, logstore, shardHash, topic, source string, logList []*sls.Log) (err error) { err = producer.waitTime() if err != nil { return err } if producer.producerConfig.AdjustShargHash { shardHash, err = AdjustHash(shardHash, producer.buckets) if err != nil { return err } } return producer.logAccumulator.addLogToProducerBatch(project, logstore, shardHash, topic, source, logList, nil) } func (producer *Producer) SendLogWithCallBack(project, logstore, topic, source string, log *sls.Log, callback CallBack) error { err := producer.waitTime() if err != nil { return err } return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, log, callback) } func (producer *Producer) SendLogListWithCallBack(project, logstore, topic, source string, logList []*sls.Log, callback CallBack) (err error) { err = producer.waitTime() if err != nil { return err } return producer.logAccumulator.addLogToProducerBatch(project, logstore, "", topic, source, logList, callback) } // todo: refactor this func (producer *Producer) waitTime() error { if atomic.LoadInt64(&producer.producerLogGroupSize) <= producer.producerConfig.TotalSizeLnBytes { return nil } // no wait if producer.producerConfig.MaxBlockSec == 0 { if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time") return errors.New(TimeoutExecption) } return nil } defer producer.monitor.recordWaitMemory(time.Now()) // infinite wait if producer.producerConfig.MaxBlockSec < 0 { for atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { time.Sleep(waitTimeUnit) } return nil } // todo: refine this, limited wait for i := 0; i < producer.producerConfig.MaxBlockSec*waitUnitPerSec; i++ { if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { time.Sleep(waitTimeUnit) } else { return nil } } producer.monitor.incWaitMemoryFail() level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time") return errors.New(TimeoutExecption) } const waitTimeUnit = time.Millisecond * 10 const waitUnitPerSec = int(time.Second / waitTimeUnit) func (producer *Producer) Start() { producer.moverWaitGroup.Add(1) level.Info(producer.logger).Log("msg", "producer mover start") go producer.mover.run(producer.moverWaitGroup, producer.producerConfig) producer.ioThreadPoolWaitGroup.Add(1) go producer.threadPool.start(producer.ioWorkerWaitGroup, producer.ioThreadPoolWaitGroup) if !producer.producerConfig.DisableRuntimeMetrics { go producer.monitor.reportThread(time.Minute, producer.logger) } } // Limited closing transfer parameter nil, safe closing transfer timeout time, timeout Ms parameter in milliseconds func (producer *Producer) Close(timeoutMs int64) error { startCloseTime := time.Now() producer.sendCloseProdcerSignal() producer.moverWaitGroup.Wait() producer.threadPool.ShutDown() for !producer.threadPool.Stopped() { if time.Since(startCloseTime) > time.Duration(timeoutMs)*time.Millisecond { level.Warn(producer.logger).Log("msg", "The producer timeout closes, and some of the cached data may not be sent properly") return errors.New(TimeoutExecption) } time.Sleep(100 * time.Millisecond) } level.Info(producer.logger).Log("msg", "All groutines of producer have been shutdown") return nil } func (producer *Producer) SafeClose() { producer.sendCloseProdcerSignal() producer.moverWaitGroup.Wait() level.Info(producer.logger).Log("msg", "Mover close finish") producer.threadPool.ShutDown() producer.ioThreadPoolWaitGroup.Wait() level.Info(producer.logger).Log("msg", "IoThreadPool close finish") producer.ioWorkerWaitGroup.Wait() level.Info(producer.logger).Log("msg", "Producer close finish") } func (producer *Producer) sendCloseProdcerSignal() { level.Info(producer.logger).Log("msg", "producer start closing") producer.closeStstokenChannel() producer.mover.moverShutDownFlag.Store(true) producer.logAccumulator.shutDownFlag.Store(true) producer.mover.ioWorker.retryQueueShutDownFlag.Store(true) } func (producer *Producer) closeStstokenChannel() { if producer.producerConfig.StsTokenShutDown != nil { close(producer.producerConfig.StsTokenShutDown) level.Info(producer.logger).Log("msg", "producer closed ststoken") } }