in consumer/shard_worker.go [187:214]
func (c *ShardConsumerWorker) sleepUtilNextFetch(lastFetchSuccessTime time.Time, plm *sls.PullLogMeta) {
sinceLastFetch := time.Since(lastFetchSuccessTime)
if sinceLastFetch > time.Duration(c.client.option.DataFetchIntervalInMs)*time.Millisecond {
return
}
lastFetchRawSize := plm.RawSize
lastFetchGroupCount := plm.Count
if c.client.option.Query != "" {
lastFetchRawSize = plm.RawSizeBeforeQuery
lastFetchGroupCount = plm.DataCountBeforeQuery
}
if lastFetchGroupCount >= c.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 {
return
}
// negative or zero sleepTime is ok
if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 {
time.Sleep(500*time.Millisecond - sinceLastFetch)
return
}
if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 {
time.Sleep(200*time.Millisecond - sinceLastFetch)
return
}
time.Sleep(50*time.Millisecond - sinceLastFetch)
}