func()

in consumer/shard_worker.go [187:214]


func (c *ShardConsumerWorker) sleepUtilNextFetch(lastFetchSuccessTime time.Time, plm *sls.PullLogMeta) {
	sinceLastFetch := time.Since(lastFetchSuccessTime)
	if sinceLastFetch > time.Duration(c.client.option.DataFetchIntervalInMs)*time.Millisecond {
		return
	}

	lastFetchRawSize := plm.RawSize
	lastFetchGroupCount := plm.Count
	if c.client.option.Query != "" {
		lastFetchRawSize = plm.RawSizeBeforeQuery
		lastFetchGroupCount = plm.DataCountBeforeQuery
	}

	if lastFetchGroupCount >= c.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 {
		return
	}
	// negative or zero sleepTime is ok
	if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 {
		time.Sleep(500*time.Millisecond - sinceLastFetch)
		return
	}
	if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 {
		time.Sleep(200*time.Millisecond - sinceLastFetch)
		return
	}

	time.Sleep(50*time.Millisecond - sinceLastFetch)
}