consumer/shard_worker.go (212 lines of code) (raw):
package consumerLibrary
import (
"fmt"
"runtime"
"sync"
"time"
"go.uber.org/atomic"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
// todo: refine the sleep time
const (
noProgressSleepTime = 500 * time.Millisecond
processFailedSleepTime = 50 * time.Millisecond
fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...]
shutdownFailedSleepTime = 100 * time.Millisecond
flushCheckPointFailedSleepTime = 100 * time.Millisecond
)
type ShardConsumerWorker struct {
client *ConsumerClient
consumerCheckPointTracker *DefaultCheckPointTracker
processor Processor
shardId int
monitor *ShardMonitor
logger log.Logger
lastCheckpointSaveTime time.Time
shutDownFlag *atomic.Bool
stopped *atomic.Bool
startOnceFlag sync.Once
ioThrottler ioThrottler
}
func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger, ioThrottler ioThrottler) *ShardConsumerWorker {
shardConsumeWorker := &ShardConsumerWorker{
processor: processor,
consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger),
client: consumerClient,
shardId: shardId,
logger: log.With(logger, "shard", shardId),
shutDownFlag: atomic.NewBool(false),
stopped: atomic.NewBool(false),
lastCheckpointSaveTime: time.Now(),
monitor: newShardMonitor(shardId, time.Minute),
ioThrottler: ioThrottler,
}
return shardConsumeWorker
}
func (c *ShardConsumerWorker) ensureStarted() {
c.startOnceFlag.Do(func() {
go c.runLoop()
})
}
func (c *ShardConsumerWorker) runLoop() {
level.Info(c.logger).Log("msg", "runLoop started")
defer func() {
c.recoverIfPanic("runLoop panic")
c.doShutDown()
}()
cursor := c.getInitCursor()
level.Info(c.logger).Log("msg", "runLoop got init cursor", "cursor", cursor)
for !c.shutDownFlag.Load() {
lastFetchTime := time.Now()
shouldCallProcess, logGroupList, plm := c.fetchLogs(cursor)
if !shouldCallProcess {
continue
}
cursor = c.callProcess(logGroupList, plm)
if c.shutDownFlag.Load() {
break
}
c.sleepUtilNextFetch(lastFetchTime, plm)
}
}
func (consumer *ShardConsumerWorker) getInitCursor() string {
for !consumer.shutDownFlag.Load() {
initCursor, err := consumer.consumerInitializeTask()
if err == nil {
return initCursor
}
time.Sleep(100 * time.Millisecond)
}
return ""
}
func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) {
c.ioThrottler.Acquire()
defer c.ioThrottler.Release()
start := time.Now()
logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor)
c.monitor.RecordFetchRequest(plm, err, start)
if err != nil {
time.Sleep(fetchFailedSleepTime)
return false, nil, nil
}
c.consumerCheckPointTracker.setCurrentCursor(cursor)
c.consumerCheckPointTracker.setNextCursor(plm.NextCursor)
if cursor == plm.NextCursor { // already reach end of shard
c.saveCheckPointIfNeeded()
time.Sleep(noProgressSleepTime)
return false, nil, nil
}
return true, logGroupList, plm
}
func (c *ShardConsumerWorker) callProcess(logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) {
for {
start := time.Now()
rollBackCheckpoint, err := c.processInternal(logGroupList)
c.monitor.RecordProcess(err, start)
c.saveCheckPointIfNeeded()
if err != nil {
level.Error(c.logger).Log("msg", "process func returns an error", "err", err)
}
if rollBackCheckpoint != "" {
level.Warn(c.logger).Log("msg", "Rollback checkpoint by user",
"rollBackCheckpoint", rollBackCheckpoint)
return rollBackCheckpoint
}
if err == nil {
return plm.NextCursor
}
// if process failed and shutting down, just quit
if c.shutDownFlag.Load() {
level.Warn(c.logger).Log("msg", "shutting down and last process failed, just quit")
return plm.NextCursor
}
time.Sleep(processFailedSleepTime)
}
}
func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollBackCheckpoint string, err error) {
defer func() {
if r := c.recoverIfPanic("panic in your process function"); r != nil {
err = fmt.Errorf("panic when process: %v", r)
}
}()
return c.processor.Process(c.shardId, logGroup, c.consumerCheckPointTracker)
}
// call user shutdown func and flush checkpoint
func (c *ShardConsumerWorker) doShutDown() {
level.Info(c.logger).Log("msg", "begin to shutdown, invoking processor.shutdown")
for {
err := c.processor.Shutdown(c.consumerCheckPointTracker) // todo: should we catch panic here?
if err == nil {
break
}
level.Error(c.logger).Log("msg", "processor.shutdown finished with error", "err", err)
time.Sleep(shutdownFailedSleepTime)
}
level.Info(c.logger).Log("msg", "call processor.shutdown succeed, begin to flush checkpoint")
for {
err := c.consumerCheckPointTracker.flushCheckPoint()
if err == nil {
break
}
level.Error(c.logger).Log("msg", "failed to flush checkpoint when shutting down", "err", err)
time.Sleep(flushCheckPointFailedSleepTime)
}
level.Info(c.logger).Log("msg", "shutting down completed, bye")
c.stopped.Store(true)
}
// todo: refine sleep time, make it more reasonable
func (c *ShardConsumerWorker) sleepUtilNextFetch(lastFetchSuccessTime time.Time, plm *sls.PullLogMeta) {
sinceLastFetch := time.Since(lastFetchSuccessTime)
if sinceLastFetch > time.Duration(c.client.option.DataFetchIntervalInMs)*time.Millisecond {
return
}
lastFetchRawSize := plm.RawSize
lastFetchGroupCount := plm.Count
if c.client.option.Query != "" {
lastFetchRawSize = plm.RawSizeBeforeQuery
lastFetchGroupCount = plm.DataCountBeforeQuery
}
if lastFetchGroupCount >= c.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 {
return
}
// negative or zero sleepTime is ok
if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 {
time.Sleep(500*time.Millisecond - sinceLastFetch)
return
}
if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 {
time.Sleep(200*time.Millisecond - sinceLastFetch)
return
}
time.Sleep(50*time.Millisecond - sinceLastFetch)
}
func (c *ShardConsumerWorker) saveCheckPointIfNeeded() {
if c.client.option.AutoCommitDisabled {
return
}
if time.Since(c.lastCheckpointSaveTime) > time.Millisecond*time.Duration(c.client.option.AutoCommitIntervalInMS) {
c.consumerCheckPointTracker.flushCheckPoint()
c.lastCheckpointSaveTime = time.Now()
}
}
func (c *ShardConsumerWorker) shutdown() {
level.Info(c.logger).Log("msg", "shutting down by others")
c.shutDownFlag.Store(true)
}
func (c *ShardConsumerWorker) isStopped() bool {
return c.stopped.Load()
}
func (c *ShardConsumerWorker) recoverIfPanic(reason string) any {
if r := recover(); r != nil {
stackBuf := make([]byte, 1<<16)
n := runtime.Stack(stackBuf, false)
level.Error(c.logger).Log("msg", "get panic in shard consumer worker",
"reason", reason,
"error", r, "stack", stackBuf[:n])
return r
}
return nil
}
func (c *ShardConsumerWorker) shouldReportMetrics() bool {
return !c.client.option.DisableRuntimeMetrics && c.monitor.shouldReport()
}
func (c *ShardConsumerWorker) reportMetrics() {
c.monitor.reportByLogger(c.logger)
}