in pkg/task/service_async.go [112:173]
func (lat *AsyncTaskService) daemon(ctx context.Context) {
util.SafeCloseChan(lat.ready)
ticker := time.NewTicker(removeExecutorInterval)
max := 0
timer := time.NewTimer(executeInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
log.Debug("daemon thread exited for AsyncTaskService stopped")
return
case <-timer.C:
lat.lock.RLock()
l := len(lat.executors)
slice := make([]*executorWithTTL, 0, l)
for _, s := range lat.executors {
slice = append(slice, s)
}
lat.lock.RUnlock()
for _, s := range slice {
s.Execute() // non-blocked
}
timer.Reset(executeInterval)
case <-ticker.C:
timeutil.ResetTimer(timer, executeInterval)
lat.lock.RLock()
l := len(lat.executors)
if l > max {
max = l
}
removes := make([]string, 0, l)
for key, se := range lat.executors {
if atomic.AddInt64(&se.TTL, -1) == 0 {
removes = append(removes, key)
}
}
lat.lock.RUnlock()
if len(removes) == 0 {
continue
}
lat.lock.Lock()
for _, key := range removes {
lat.removeExecutor(key)
}
l = len(lat.executors)
if max > initExecutorCount && max > l*compactTimes {
lat.renew()
max = l
}
lat.lock.Unlock()
log.Debug(fmt.Sprintf("daemon thread completed, %d executor(s) removed", len(removes)))
}
}
}