in contrib/cmd/runkperf/commands/bench/cilium_cr_list.go [131:217]
func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, numCEP int) error {
totalNumResources := numCID + numCEP
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Loading Cilium data",
"CiliumIdentities", numCID, "CiliumEndpoints", numCEP)
// Parallelize kubectl apply to speed it up. Achieves ~80 inserts/sec.
taskChan := make(chan string, numCRApplyWorkers*2)
var appliedCount atomic.Uint64
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < numCRApplyWorkers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ciliumResourceData, ok := <-taskChan:
if !ok {
return nil // taskChan closed
}
var err error
for i := 0; i < maxNumCRApplyAttempts; i++ {
err = kr.ServerSideApplyWithData(ctx, kubectlApplyTimeout, ciliumResourceData)
if err == nil {
appliedCount.Add(1)
break
} else if i < maxNumCRApplyAttempts-1 {
log.GetLogger(ctx).WithKeyValues("level", "warn").LogKV("msg", "failed to apply cilium resource data, will retry", "error", err)
}
}
if err != nil { // last retry failed, so give up.
return fmt.Errorf("failed to apply cilium resource data: %w", err)
}
}
}
})
}
// Report progress periodically.
reporterDoneChan := make(chan struct{})
g.Go(func() error {
timer := time.NewTicker(progressReportInterval)
defer timer.Stop()
for {
select {
case <-reporterDoneChan:
return nil
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
c := appliedCount.Load()
percent := int(float64(c) / float64(totalNumResources) * 100)
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent))
}
}
})
// Generate CiliumIdentity and CiliumEndpoint CRs to be applied by the worker goroutines.
g.Go(func() error {
for i := 0; i < numCID; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case taskChan <- generateCiliumIdentity():
}
}
for i := 0; i < numCEP; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case taskChan <- generateCiliumEndpoint():
}
}
close(taskChan) // signal to consumer goroutines that we're done.
close(reporterDoneChan)
return nil
})
if err := g.Wait(); err != nil {
return err
}
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP))
return nil
}