func ImportFlow()

in internal/client/integrations/integrations.go [1174:1245]


func ImportFlow(name string, folder string, numConnections int) (err error) {
	var versions []string

	rIntegrationFlowFiles := regexp.MustCompile(name + `\+[0-9]+\+[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\.json`)

	err = filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			clilog.Warning.Println("integration folder not found")
			return nil
		}
		if info.IsDir() {
			return nil
		}
		if filepath.Ext(path) != ".json" {
			return nil
		}
		fileName := filepath.Base(path)
		ok := rIntegrationFlowFiles.Match([]byte(fileName))
		if ok {
			versions = append(versions, path)
		}
		return nil
	})
	if err != nil {
		return err
	}

	numEntities := len(versions)
	clilog.Info.Printf("Found %d versions for integration %s in the folder\n", numEntities, name)
	clilog.Debug.Printf("Importing versions with %d connections\n", numConnections)

	apiclient.ClientPrintHttpResponse.Set(false)
	defer apiclient.ClientPrintHttpResponse.Set(apiclient.GetCmdPrintHttpResponseSetting())

	errChan := make(chan error)
	workChan := make(chan []string, numEntities)

	fanOutWg := sync.WaitGroup{}
	fanInWg := sync.WaitGroup{}

	errs := []string{}
	fanInWg.Add(1)

	go func() {
		defer fanInWg.Done()
		for {
			newErr, ok := <-errChan
			if !ok {
				return
			}
			errs = append(errs, newErr.Error())
		}
	}()

	for i := 0; i < numConnections; i++ {
		fanOutWg.Add(1)
		go batchImport(&fanOutWg, name, workChan, errChan)
	}

	workChan <- versions

	close(workChan)
	fanOutWg.Wait()
	close(errChan)
	fanInWg.Wait()

	if len(errs) > 0 {
		return errors.New(strings.Join(errs, "\n"))
	}

	return nil
}