in experiments/kompanion/cmd/export/export.go [243:378]
func RunExport(ctx context.Context, opts *ExportOptions) error {
log.Printf("Running kompanion export with kubeconfig: %s", opts.kubeconfig)
if err := opts.validateFlags(); err != nil {
return err
}
config, err := getRESTConfig(ctx, opts)
if err != nil {
return fmt.Errorf("error building kubeconfig: %w", err)
}
// We rely more on server-side rate limiting now, so give it a high client-side QPS
if config.QPS == 0 {
config.QPS = 100
config.Burst = 20
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("error creating Kubernetes clientset: %sw", err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return fmt.Errorf("error creating dynamic client: %w", err)
}
// use the discovery client to iterate over all api resources
discoveryClient := clientset.Discovery()
var resources []schema.GroupVersionResource
resources, err = utils.GetResources(discoveryClient, resources)
if err != nil {
return fmt.Errorf("error fetching resources: %w", err)
}
// todo acpana debug logs
// log.Printf("Going to iterate over the following resources %+v", resourcesToName(resources))
namespaces, err := clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error fetching namespaces: %w", err)
}
reportName := timestampedName(opts.reportNamePrefix)
reportTempDir := filepath.Join(".", reportName)
reportFile := filepath.Join(".", reportName+".tar.gz")
if err := os.Mkdir(filepath.Join(reportTempDir), 0o700); err != nil {
return fmt.Errorf("could not create %q directory: %w", reportTempDir, err)
}
shouldExcludeObject := func(id types.NamespacedName) bool {
if shouldExclude(id.Namespace, opts.ignoreNamespaces, opts.targetNamespaces) {
return true
}
return shouldExclude(id.Name, opts.ignoreObjects, opts.targetObjects)
}
// create the work log for go routine workers to use
q := &taskQueue{}
// Parallize across resources, unless we are scoped to a few namespaces
// The thought is that if users target a particular namespace (or a few), they may not have cluster-wide permission.
perNamespace := len(opts.targetNamespaces) > 0
if perNamespace {
for _, ns := range namespaces.Items {
if shouldExclude(ns.Name, opts.ignoreNamespaces, opts.targetNamespaces) {
continue
}
q.AddTask(&dumpResourcesTask{
Namespace: ns.Name,
Resources: resources,
DynamicClient: dynamicClient,
ReportDir: reportTempDir,
ShouldExcludeObject: shouldExcludeObject,
})
}
} else {
for _, resource := range resources {
q.AddTask(&dumpResourcesTask{
Resources: []schema.GroupVersionResource{resource},
DynamicClient: dynamicClient,
ReportDir: reportTempDir,
ShouldExcludeObject: shouldExcludeObject,
})
}
}
log.Printf("Starting worker threads to process Config Connector objects")
var wg sync.WaitGroup
var errs []error
var errsMutex sync.Mutex
for i := 0; i < opts.workerRountines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
task := q.GetWork()
if task == nil {
// no work
return
}
if err := task.Run(ctx); err != nil {
errsMutex.Lock()
errs = append(errs, err)
errsMutex.Unlock()
}
}
}()
}
log.Printf("Dumping Config Connector objects to %s", reportFile)
wg.Wait()
if err := tarReport(reportFile, reportTempDir); err != nil {
return fmt.Errorf("failed to create report file %s: %w", reportFile, err)
}
if len(errs) != 0 {
return fmt.Errorf("there have been errors in the export process: %+v", errors.Join(errs...))
}
// clean up files
if err := os.RemoveAll(reportTempDir); err != nil {
return fmt.Errorf("failed to clean up report folder %q: %w", reportTempDir, err)
}
fmt.Fprintf(os.Stderr, "created report file: %v\n", reportFile)
return nil
}