consumer/worker.go (216 lines of code) (raw):
package consumerLibrary
import (
"io"
"os"
"sync"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
)
type ConsumerWorker struct {
consumerHeatBeat *ConsumerHeartBeat
client *ConsumerClient
workerShutDownFlag *atomic.Bool
shardConsumer sync.Map // map[int]*ShardConsumerWorker
processor Processor
waitGroup sync.WaitGroup
Logger log.Logger
ioThrottler ioThrottler
}
// depreciated: this old logic is to automatically save to memory, and then commit at a fixed time
// we highly recommend you to use InitConsumerWorkerWithCheckpointTracker
func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker {
if option.AutoCommitDisabled {
panic("auto commit already disabled, sdk will not save any checkpoint, " +
"please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false")
}
return InitConsumerWorkerWithProcessor(
option,
ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
cursor := do(shardId, logGroupList)
// keep the original logic
// if cursor is not empty, we don't save,
if cursor == "" {
checkpointTracker.SaveCheckPoint(false)
}
return cursor, nil
}),
)
}
// InitConsumerWorkerWithCheckpointTracker
// please note that you need to save after the process is successful,
func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker {
return InitConsumerWorkerWithProcessor(option, ProcessFunc(do))
}
// InitConsumerWorkerWithProcessor
// you need save checkpoint by yourself and can do something after consumer shutdown
func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker {
logger := option.Logger
if logger == nil {
logger = logConfig(option)
}
maxIoWorker := defaultMaxIoWorkers
if option.MaxIoWorkers > 0 {
maxIoWorker = option.MaxIoWorkers
}
consumerClient := initConsumerClient(option, logger)
consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger)
consumerWorker := &ConsumerWorker{
consumerHeatBeat: consumerHeatBeat,
client: consumerClient,
workerShutDownFlag: atomic.NewBool(false),
//shardConsumer: make(map[int]*ShardConsumerWorker),
processor: processor,
Logger: logger,
ioThrottler: newSimpleIoThrottler(maxIoWorker),
}
if err := consumerClient.createConsumerGroup(); err != nil {
level.Error(consumerWorker.Logger).Log(
"msg", "possibly failed to create or update consumer group, please check worker run log",
"err", err)
}
return consumerWorker
}
func (consumerWorker *ConsumerWorker) Start() {
consumerWorker.waitGroup.Add(1)
go consumerWorker.run()
}
func (consumerWorker *ConsumerWorker) StopAndWait() {
level.Info(consumerWorker.Logger).Log("msg", "*** try to exit ***")
consumerWorker.workerShutDownFlag.Store(true)
consumerWorker.consumerHeatBeat.shutDownHeart()
consumerWorker.waitGroup.Wait()
level.Info(consumerWorker.Logger).Log("msg", "consumer worker stopped", "consumer name", consumerWorker.client.option.ConsumerName)
}
func (consumerWorker *ConsumerWorker) run() {
level.Info(consumerWorker.Logger).Log("msg", "consumer worker start", "worker name", consumerWorker.client.option.ConsumerName)
defer consumerWorker.waitGroup.Done()
go consumerWorker.consumerHeatBeat.heartBeatRun()
for !consumerWorker.workerShutDownFlag.Load() {
heldShards := consumerWorker.consumerHeatBeat.getHeldShards()
lastFetchTime := time.Now().UnixNano() / 1000 / 1000
for _, shard := range heldShards {
if consumerWorker.workerShutDownFlag.Load() {
break
}
shardConsumer := consumerWorker.getShardConsumer(shard)
shardConsumer.ensureStarted()
if shardConsumer.shouldReportMetrics() {
shardConsumer.reportMetrics()
}
}
consumerWorker.cleanShardConsumer(heldShards)
TimeToSleepInMillsecond(consumerWorker.client.option.DataFetchIntervalInMs, lastFetchTime, consumerWorker.workerShutDownFlag.Load())
}
level.Info(consumerWorker.Logger).Log("msg", "consumer worker try to cleanup consumers", "worker name", consumerWorker.client.option.ConsumerName)
consumerWorker.shutDownAndWait()
}
func (consumerWorker *ConsumerWorker) shutDownAndWait() {
for {
time.Sleep(500 * time.Millisecond)
count := 0
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
count++
consumer := value.(*ShardConsumerWorker)
if !consumer.isStopped() {
consumer.shutdown()
} else {
consumerWorker.shardConsumer.Delete(key)
}
return true
},
)
if count == 0 {
break
}
}
}
func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsumerWorker {
consumer, ok := consumerWorker.shardConsumer.Load(shardId)
if ok {
return consumer.(*ShardConsumerWorker)
}
consumerIns := newShardConsumerWorker(shardId,
consumerWorker.client,
consumerWorker.consumerHeatBeat,
consumerWorker.processor,
consumerWorker.Logger,
consumerWorker.ioThrottler)
consumerWorker.shardConsumer.Store(shardId, consumerIns)
return consumerIns
}
func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) {
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
shard := key.(int)
consumer := value.(*ShardConsumerWorker)
if !Contain(shard, owned_shards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.shutdown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
}
if consumer.isStopped() {
isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard)
if isDeleteShard {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard)
consumerWorker.shardConsumer.Delete(shard)
} else {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard failed", "shardId", shard)
}
}
return true
},
)
}
// This function is used to initialize the global logger
func logConfig(option LogHubConfig) log.Logger {
var writer io.Writer
if option.LogFileName == "" {
writer = log.NewSyncWriter(os.Stdout)
} else {
if option.LogMaxSize == 0 {
option.LogMaxSize = 10
}
if option.LogMaxBackups == 0 {
option.LogMaxBackups = 10
}
writer = &lumberjack.Logger{
Filename: option.LogFileName,
MaxSize: option.LogMaxSize,
MaxBackups: option.LogMaxBackups,
Compress: option.LogCompass,
}
}
var logger log.Logger
if option.IsJsonType {
logger = log.NewJSONLogger(writer)
} else {
logger = log.NewLogfmtLogger(writer)
}
switch option.AllowLogLevel {
case "debug":
logger = level.NewFilter(logger, level.AllowDebug())
case "info":
logger = level.NewFilter(logger, level.AllowInfo())
case "warn":
logger = level.NewFilter(logger, level.AllowWarn())
case "error":
logger = level.NewFilter(logger, level.AllowError())
default:
logger = level.NewFilter(logger, level.AllowInfo())
}
logger = log.With(logger, "time", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
return logger
}
type ioThrottler interface {
Acquire()
Release()
}
type simpleIoThrottler struct {
chance chan struct{}
}
func newSimpleIoThrottler(maxIoWorkers int) *simpleIoThrottler {
return &simpleIoThrottler{
chance: make(chan struct{}, maxIoWorkers),
}
}
func (t *simpleIoThrottler) Acquire() {
t.chance <- struct{}{}
}
func (t *simpleIoThrottler) Release() {
<-t.chance
}