in internal/client/integrations/integrations.go [1174:1245]
func ImportFlow(name string, folder string, numConnections int) (err error) {
var versions []string
rIntegrationFlowFiles := regexp.MustCompile(name + `\+[0-9]+\+[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\.json`)
err = filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
if err != nil {
clilog.Warning.Println("integration folder not found")
return nil
}
if info.IsDir() {
return nil
}
if filepath.Ext(path) != ".json" {
return nil
}
fileName := filepath.Base(path)
ok := rIntegrationFlowFiles.Match([]byte(fileName))
if ok {
versions = append(versions, path)
}
return nil
})
if err != nil {
return err
}
numEntities := len(versions)
clilog.Info.Printf("Found %d versions for integration %s in the folder\n", numEntities, name)
clilog.Debug.Printf("Importing versions with %d connections\n", numConnections)
apiclient.ClientPrintHttpResponse.Set(false)
defer apiclient.ClientPrintHttpResponse.Set(apiclient.GetCmdPrintHttpResponseSetting())
errChan := make(chan error)
workChan := make(chan []string, numEntities)
fanOutWg := sync.WaitGroup{}
fanInWg := sync.WaitGroup{}
errs := []string{}
fanInWg.Add(1)
go func() {
defer fanInWg.Done()
for {
newErr, ok := <-errChan
if !ok {
return
}
errs = append(errs, newErr.Error())
}
}()
for i := 0; i < numConnections; i++ {
fanOutWg.Add(1)
go batchImport(&fanOutWg, name, workChan, errChan)
}
workChan <- versions
close(workChan)
fanOutWg.Wait()
close(errChan)
fanInWg.Wait()
if len(errs) > 0 {
return errors.New(strings.Join(errs, "\n"))
}
return nil
}