func()

in internal/meta/base_meta.go [363:432]


func (meta *baseMeta) ParallelImport(ctx context.Context, items []*ImportItem) error {
	meta.tc.Trace(telemetry.Info, "ParallelImport Enter")
	defer meta.tc.Trace(telemetry.Info, "ParallelImport Leave")

	total := len(items)
	itemsCh := make(chan *ImportItem, total)
	for _, item := range items {
		itemsCh <- item
	}
	close(itemsCh)

	wp := workerpool.NewWorkPool(meta.parallelism)

	wp.Run(func(i interface{}) error {
		idx := i.(int)

		// Noop if tfclient is set
		if meta.tfclient != nil {
			return nil
		}

		stateFile := filepath.Join(meta.importBaseDirs[idx], "terraform.tfstate")

		// Don't merge state file if this import dir doesn't contain state file, which can because either this import dir imported nothing, or it encountered import error
		if _, err := os.Stat(stateFile); os.IsNotExist(err) {
			return nil
		}
		// Ensure the state file is removed after this round import, preparing for the next round.
		defer os.Remove(stateFile)

		meta.Logger().Debug("Merging terraform state file (tfmerge)", "file", stateFile)
		newState, err := tfmerge.Merge(ctx, meta.tf, meta.baseState, stateFile)
		if err != nil {
			return fmt.Errorf("failed to merge state file: %v", err)
		}
		meta.baseState = newState

		return nil
	})

	for i := 0; i < meta.parallelism; i++ {
		i := i
		wp.AddTask(func() (interface{}, error) {
			for item := range itemsCh {
				iitem := config.ImportItem{
					AzureResourceID: item.AzureResourceID,
					TFResourceId:    item.TFResourceId,
					ImportError:     item.ImportError,
					TFAddr:          item.TFAddr,
				}
				startTime := time.Now()
				if meta.preImportHook != nil {
					meta.preImportHook(startTime, iitem)
				}
				meta.importItem(ctx, item, i)
				if meta.postImportHook != nil {
					meta.postImportHook(startTime, iitem)
				}
			}
			return i, nil
		})
	}

	// #nosec G104
	if err := wp.Done(); err != nil {
		return err
	}

	return nil
}