func()

in router/core/cache_warmup.go [103:199]


func (w *cacheWarmup) run(ctx context.Context) (int, error) {

	ctx, cancel := context.WithTimeout(ctx, w.timeout)
	defer cancel()

	items, err := w.source.LoadItems(ctx, w.log)
	if err != nil {
		return 0, err
	}

	if len(items) == 0 {
		w.log.Debug("No items to process")
		return 0, nil
	}

	w.log.Info("Starting processing",
		zap.Int("items", len(items)),
	)

	defaultClientInfo := &nodev1.ClientInfo{}

	done := ctx.Done()
	index := make(chan int, len(items))
	defer close(index)
	itemCompleted := make(chan struct{})

	for i, item := range items {
		if item.Client == nil {
			item.Client = defaultClientInfo
		}
		index <- i
	}

	var (
		rl ratelimit.Limiter
	)

	if w.itemsPerSecond > 0 {
		rl = ratelimit.New(w.itemsPerSecond)
	} else {
		rl = ratelimit.NewUnlimited()
	}

	for i := 0; i < w.workers; i++ {
		go func(i int) {
			for {
				select {
				case <-done:
					return
				case idx, ok := <-index:
					if !ok {
						return
					}
					rl.Take()
					item := items[idx]

					res, err := w.processor.ProcessOperation(ctx, item)
					if err != nil {
						w.log.Warn("Failed to process operation, skipping",
							zap.Error(err),
							zap.String("client_name", item.Client.Name),
							zap.String("client_version", item.Client.Version),
							zap.String("query", item.Request.Query),
							zap.String("operation_name", item.Request.OperationName),
						)
					}

					if err == nil && w.afterOperation != nil {
						w.afterOperation(res)
					}

					select {
					case <-done:
						return
					case itemCompleted <- struct{}{}:
					}
				}
			}
		}(i)
	}

	for i := 0; i < len(items); i++ {
		processed := i + 1
		select {
		case <-done:
			return processed, ctx.Err()
		case <-itemCompleted:
			if processed%100 == 0 {
				w.log.Info("Processing completed",
					zap.Int("processed_items", processed),
				)
			}
		}
	}

	return len(items), nil
}