func()

in pkg/task/service_async.go [112:173]


func (lat *AsyncTaskService) daemon(ctx context.Context) {
	util.SafeCloseChan(lat.ready)
	ticker := time.NewTicker(removeExecutorInterval)
	max := 0
	timer := time.NewTimer(executeInterval)
	defer timer.Stop()
	for {
		select {
		case <-ctx.Done():
			log.Debug("daemon thread exited for AsyncTaskService stopped")
			return
		case <-timer.C:
			lat.lock.RLock()
			l := len(lat.executors)
			slice := make([]*executorWithTTL, 0, l)
			for _, s := range lat.executors {
				slice = append(slice, s)
			}
			lat.lock.RUnlock()

			for _, s := range slice {
				s.Execute() // non-blocked
			}

			timer.Reset(executeInterval)
		case <-ticker.C:
			timeutil.ResetTimer(timer, executeInterval)

			lat.lock.RLock()
			l := len(lat.executors)
			if l > max {
				max = l
			}

			removes := make([]string, 0, l)
			for key, se := range lat.executors {
				if atomic.AddInt64(&se.TTL, -1) == 0 {
					removes = append(removes, key)
				}
			}
			lat.lock.RUnlock()

			if len(removes) == 0 {
				continue
			}

			lat.lock.Lock()
			for _, key := range removes {
				lat.removeExecutor(key)
			}

			l = len(lat.executors)
			if max > initExecutorCount && max > l*compactTimes {
				lat.renew()
				max = l
			}
			lat.lock.Unlock()

			log.Debug(fmt.Sprintf("daemon thread completed, %d executor(s) removed", len(removes)))
		}
	}
}