in internal/task_request/task_request.go [77:113]
func (h *taskRequestTimer) Start(ctx context.Context, s *server.IndexServer) error {
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
lastSentRequestAt := time.Now()
pullFrequency := defaultTaskRequestInterval
slog.Info("starting task request timer", "interval", pullFrequency.Seconds())
localCtx, localCancel := context.WithCancel(ctx)
defer func() { localCancel() }() // use the current value of 'localCancel'
for {
select {
case <-ctx.Done():
return ctx.Err() //nolint:govet // It is a false positive
case <-ticker.C:
delta := time.Since(lastSentRequestAt)
if delta >= pullFrequency {
body, err := h.SendRequest(localCtx)
if err != nil { // blocking
slog.Error("error while sending task request", "err", err)
} else {
res := task_request_response.Process(localCtx, body, s)
if res.Interval != 0 {
pullFrequency = res.Interval
}
if res.StopIndexing {
localCancel() // stop all currently running indexing
// construct new context for all future indexing
localCtx, localCancel = context.WithCancel(ctx) //nolint:govet // It is a false positive
}
}
lastSentRequestAt = time.Now()
slog.Info("ticker TaskRequest information", "delta", delta.Seconds(), "lastSentRequestAt", lastSentRequestAt, "interval", pullFrequency.Seconds())
}
}
}
}