in router/core/cache_warmup.go [103:199]
func (w *cacheWarmup) run(ctx context.Context) (int, error) {
ctx, cancel := context.WithTimeout(ctx, w.timeout)
defer cancel()
items, err := w.source.LoadItems(ctx, w.log)
if err != nil {
return 0, err
}
if len(items) == 0 {
w.log.Debug("No items to process")
return 0, nil
}
w.log.Info("Starting processing",
zap.Int("items", len(items)),
)
defaultClientInfo := &nodev1.ClientInfo{}
done := ctx.Done()
index := make(chan int, len(items))
defer close(index)
itemCompleted := make(chan struct{})
for i, item := range items {
if item.Client == nil {
item.Client = defaultClientInfo
}
index <- i
}
var (
rl ratelimit.Limiter
)
if w.itemsPerSecond > 0 {
rl = ratelimit.New(w.itemsPerSecond)
} else {
rl = ratelimit.NewUnlimited()
}
for i := 0; i < w.workers; i++ {
go func(i int) {
for {
select {
case <-done:
return
case idx, ok := <-index:
if !ok {
return
}
rl.Take()
item := items[idx]
res, err := w.processor.ProcessOperation(ctx, item)
if err != nil {
w.log.Warn("Failed to process operation, skipping",
zap.Error(err),
zap.String("client_name", item.Client.Name),
zap.String("client_version", item.Client.Version),
zap.String("query", item.Request.Query),
zap.String("operation_name", item.Request.OperationName),
)
}
if err == nil && w.afterOperation != nil {
w.afterOperation(res)
}
select {
case <-done:
return
case itemCompleted <- struct{}{}:
}
}
}
}(i)
}
for i := 0; i < len(items); i++ {
processed := i + 1
select {
case <-done:
return processed, ctx.Err()
case <-itemCompleted:
if processed%100 == 0 {
w.log.Info("Processing completed",
zap.Int("processed_items", processed),
)
}
}
}
return len(items), nil
}