func loadCiliumData()

in contrib/cmd/runkperf/commands/bench/cilium_cr_list.go [131:217]


func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, numCEP int) error {
	totalNumResources := numCID + numCEP
	log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Loading Cilium data",
		"CiliumIdentities", numCID, "CiliumEndpoints", numCEP)

	// Parallelize kubectl apply to speed it up. Achieves ~80 inserts/sec.
	taskChan := make(chan string, numCRApplyWorkers*2)
	var appliedCount atomic.Uint64
	g, ctx := errgroup.WithContext(ctx)
	for i := 0; i < numCRApplyWorkers; i++ {
		g.Go(func() error {
			for {
				select {
				case <-ctx.Done():
					return ctx.Err()
				case ciliumResourceData, ok := <-taskChan:
					if !ok {
						return nil // taskChan closed
					}
					var err error
					for i := 0; i < maxNumCRApplyAttempts; i++ {
						err = kr.ServerSideApplyWithData(ctx, kubectlApplyTimeout, ciliumResourceData)
						if err == nil {
							appliedCount.Add(1)
							break
						} else if i < maxNumCRApplyAttempts-1 {
							log.GetLogger(ctx).WithKeyValues("level", "warn").LogKV("msg", "failed to apply cilium resource data, will retry", "error", err)
						}
					}
					if err != nil { // last retry failed, so give up.
						return fmt.Errorf("failed to apply cilium resource data: %w", err)
					}
				}
			}
		})
	}

	// Report progress periodically.
	reporterDoneChan := make(chan struct{})
	g.Go(func() error {
		timer := time.NewTicker(progressReportInterval)
		defer timer.Stop()
		for {
			select {
			case <-reporterDoneChan:
				return nil
			case <-ctx.Done():
				return ctx.Err()
			case <-timer.C:
				c := appliedCount.Load()
				percent := int(float64(c) / float64(totalNumResources) * 100)
				log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent))
			}
		}
	})

	// Generate CiliumIdentity and CiliumEndpoint CRs to be applied by the worker goroutines.
	g.Go(func() error {
		for i := 0; i < numCID; i++ {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case taskChan <- generateCiliumIdentity():
			}
		}

		for i := 0; i < numCEP; i++ {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case taskChan <- generateCiliumEndpoint():
			}
		}

		close(taskChan) // signal to consumer goroutines that we're done.
		close(reporterDoneChan)
		return nil
	})

	if err := g.Wait(); err != nil {
		return err
	}

	log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP))

	return nil
}