in internal/meta/base_meta.go [363:432]
func (meta *baseMeta) ParallelImport(ctx context.Context, items []*ImportItem) error {
meta.tc.Trace(telemetry.Info, "ParallelImport Enter")
defer meta.tc.Trace(telemetry.Info, "ParallelImport Leave")
total := len(items)
itemsCh := make(chan *ImportItem, total)
for _, item := range items {
itemsCh <- item
}
close(itemsCh)
wp := workerpool.NewWorkPool(meta.parallelism)
wp.Run(func(i interface{}) error {
idx := i.(int)
// Noop if tfclient is set
if meta.tfclient != nil {
return nil
}
stateFile := filepath.Join(meta.importBaseDirs[idx], "terraform.tfstate")
// Don't merge state file if this import dir doesn't contain state file, which can because either this import dir imported nothing, or it encountered import error
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
return nil
}
// Ensure the state file is removed after this round import, preparing for the next round.
defer os.Remove(stateFile)
meta.Logger().Debug("Merging terraform state file (tfmerge)", "file", stateFile)
newState, err := tfmerge.Merge(ctx, meta.tf, meta.baseState, stateFile)
if err != nil {
return fmt.Errorf("failed to merge state file: %v", err)
}
meta.baseState = newState
return nil
})
for i := 0; i < meta.parallelism; i++ {
i := i
wp.AddTask(func() (interface{}, error) {
for item := range itemsCh {
iitem := config.ImportItem{
AzureResourceID: item.AzureResourceID,
TFResourceId: item.TFResourceId,
ImportError: item.ImportError,
TFAddr: item.TFAddr,
}
startTime := time.Now()
if meta.preImportHook != nil {
meta.preImportHook(startTime, iitem)
}
meta.importItem(ctx, item, i)
if meta.postImportHook != nil {
meta.postImportHook(startTime, iitem)
}
}
return i, nil
})
}
// #nosec G104
if err := wp.Done(); err != nil {
return err
}
return nil
}