func DbtConverter()

in backend/plugins/dbt/tasks/convertor.go [36:230]


func DbtConverter(taskCtx plugin.SubTaskContext) (err errors.Error) {
	logger := taskCtx.GetLogger()
	taskCtx.SetProgress(0, -1)
	data := taskCtx.GetData().(*DbtTaskData)
	models := data.Options.SelectedModels
	projectPath := data.Options.ProjectPath
	projectName := data.Options.ProjectName
	projectTarget := data.Options.ProjectTarget
	projectVars := data.Options.ProjectVars
	args := data.Options.Args
	failFast := data.Options.FailFast
	threads := data.Options.Threads
	noVersionCheck := data.Options.NoVersionCheck
	excludeModels := data.Options.ExcludeModels
	selector := data.Options.Selector
	state := data.Options.State
	deferFlag := data.Options.Defer
	noDefer := data.Options.NoDefer
	fullRefresh := data.Options.FullRefresh
	profilesPath := data.Options.ProfilesPath
	profile := data.Options.Profile

	defaultProfilesPath := filepath.Join(projectPath, "profiles.yml")
	_, err = errors.Convert01(os.Stat(defaultProfilesPath))
	// if profiles.yml not exist, create it manually
	if err != nil {
		dbUrl := taskCtx.GetConfig("DB_URL")
		u, err := errors.Convert01(url.Parse(dbUrl))
		if err != nil {
			return err
		}
		dbType := u.Scheme
		dbUsername := u.User.Username()
		dbPassword, _ := u.User.Password()
		dbServer, dbPort, _ := net.SplitHostPort(u.Host)
		dbDataBase := u.Path[1:]
		var dbSchema string
		flag := strings.Compare(dbType, "mysql")
		if flag == 0 {
			// mysql database
			dbSchema = dbDataBase
		} else {
			// other database
			mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
			if err != nil {
				return err
			}
			if value, ok := mapQuery["search_path"]; ok {
				if len(value) < 1 {
					return errors.Default.New("DB_URL search_path parses error")
				}
				dbSchema = value[0]
			} else {
				dbSchema = "public"
			}
		}
		config := viper.New()
		config.Set(projectName+".target", projectTarget)
		config.Set(projectName+".outputs."+projectTarget+".type", dbType)
		dbPortInt, _ := strconv.Atoi(dbPort)
		config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
		config.Set(projectName+".outputs."+projectTarget+".password", dbPassword)
		config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
		if flag == 0 {
			config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
			config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
			config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
		} else {
			config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
			config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
			config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
		}
		err = errors.Convert(config.WriteConfigAs(defaultProfilesPath))
		if err != nil {
			return err
		}
	}
	// if package.yml exist, install dbt dependencies
	defaultPackagesPath := filepath.Join(projectPath, "packages.yml")
	_, err = errors.Convert01(os.Stat(defaultPackagesPath))
	if err == nil {
		cmd := exec.Command("dbt", "deps")
		err = errors.Convert(cmd.Start())
		if err != nil {
			return err
		}
	}

	dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
	if projectVars != nil {
		jsonProjectVars, err := json.Marshal(projectVars)
		if err != nil {
			return errors.Default.New("parameters vars json marshal error")
		}
		dbtExecParams = append(dbtExecParams, "--vars")
		dbtExecParams = append(dbtExecParams, string(jsonProjectVars))
	}
	if len(models) > 0 {
		dbtExecParams = append(dbtExecParams, "--select")
		dbtExecParams = append(dbtExecParams, models...)
	}
	if args != nil {
		dbtExecParams = append(dbtExecParams, args...)
	}
	if failFast {
		dbtExecParams = append(dbtExecParams, "--fail-fast")
	}
	if threads != 0 {
		dbtExecParams = append(dbtExecParams, "--threads")
		dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
	}
	if noVersionCheck {
		dbtExecParams = append(dbtExecParams, "--no-version-check")
	}
	if excludeModels != nil {
		dbtExecParams = append(dbtExecParams, "--exclude")
		dbtExecParams = append(dbtExecParams, excludeModels...)
	}
	if selector != "" {
		dbtExecParams = append(dbtExecParams, "--selector")
		dbtExecParams = append(dbtExecParams, selector)
	}
	if state != "" {
		dbtExecParams = append(dbtExecParams, "--state")
		dbtExecParams = append(dbtExecParams, state)
	}
	if deferFlag {
		dbtExecParams = append(dbtExecParams, "--defer")
	}
	if noDefer {
		dbtExecParams = append(dbtExecParams, "--no-defer")
	}
	if fullRefresh {
		dbtExecParams = append(dbtExecParams, "--full-refresh")
	}
	if profilesPath != "" {
		dbtExecParams = append(dbtExecParams, "--profiles-dir")
		dbtExecParams = append(dbtExecParams, profilesPath)
	} else {
		// default projectPath
		dbtExecParams = append(dbtExecParams, "--profiles-dir")
		dbtExecParams = append(dbtExecParams, projectPath)
	}
	if profile != "" {
		dbtExecParams = append(dbtExecParams, "--profile")
		dbtExecParams = append(dbtExecParams, profile)
	}
	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
	logger.Info("dbt run script: ", cmd)

	stdout, stdoutErr := cmd.StdoutPipe()
	if stdoutErr != nil {
		return errors.Convert(stdoutErr)
	}

	if err = errors.Convert(cmd.Start()); err != nil {
		return err
	}

	// prevent zombie process
	var errStr string
	defer func() {
		err = errors.Convert(cmd.Wait())
		if err != nil {
			logger.Error(err, "The DBT project run failed!")
			err = errors.SubtaskErr.New(errStr)
		} else {
			logger.Info("The DBT project run ended.")
		}
	}()

	scanner := bufio.NewScanner(stdout)
	for scanner.Scan() {
		line := scanner.Text()
		logger.Info(line)
		if strings.Contains(line, "Encountered an error") || errStr != "" {
			errStr += line + "\n"
		}
		if strings.Contains(line, "of") && strings.Contains(line, "OK") {
			taskCtx.IncProgress(1)
		}
	}
	if err = errors.Convert(scanner.Err()); err != nil {
		logger.Error(err, "dbt read stdout failed.")
		return err
	}

	// close stdout
	if closeErr := stdout.Close(); closeErr != nil && err == nil {
		logger.Error(closeErr, "dbt close stdout failed.")
		return errors.Convert(closeErr)
	}

	return err
}