func VerifyAndUpdateCfg()

in streaming/streaming.go [197:287]


func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, schemaDetails map[string]internal.SchemaDetails) error {
	dsCfg := streamingCfg.DatastreamCfg
	if dsCfg.StreamLocation == "" {
		return fmt.Errorf("please specify DatastreamCfg.StreamLocation in the streaming config")
	}
	dfCfg := streamingCfg.DataflowCfg
	if dfCfg.Location == "" {
		return fmt.Errorf("please specify the Location under DataflowCfg in the streaming config")
	}

	// If both ID and Display name are empty, generate a new one for both.
	// If either is present, assign it to the other one.
	if dsCfg.StreamId == "" && dsCfg.StreamDisplayName == "" {
		// TODO: Update names to have more info like dbname.
		streamId, err := utils.GenerateName("smt-stream-" + dbName)
		streamId = strings.Replace(streamId, "_", "-", -1)
		if err != nil {
			return fmt.Errorf("error generating stream name: %v", err)
		}
		streamingCfg.DatastreamCfg.StreamId = streamId
		streamingCfg.DatastreamCfg.StreamDisplayName = streamId
	} else if dsCfg.StreamId == "" {
		streamingCfg.DatastreamCfg.StreamId = streamingCfg.DatastreamCfg.StreamDisplayName
	} else if dsCfg.StreamDisplayName == "" {
		streamingCfg.DatastreamCfg.StreamDisplayName = streamingCfg.DatastreamCfg.StreamId
	}

	streamingCfg.DatastreamCfg.SchemaDetails = schemaDetails

	if dsCfg.MaxConcurrentCdcTasks != "" {
		intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentCdcTasks, 10, 64)
		if err != nil {
			return fmt.Errorf("could not parse maxConcurrentCdcTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentCdcTasks)
		}
		maxCdcTasks = int32(intVal)
		if maxCdcTasks < MIN_DATASTREAM_TASK_LIMIT || maxCdcTasks > MAX_DATASTREAM_TASK_LIMIT {
			return fmt.Errorf("maxConcurrentCdcTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
		}
	}
	if dsCfg.MaxConcurrentBackfillTasks != "" {
		intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentBackfillTasks, 10, 64)
		if err != nil {
			return fmt.Errorf("could not parse maxConcurrentBackfillTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentBackfillTasks)
		}
		maxBackfillTasks = int32(intVal)
		if maxBackfillTasks < MIN_DATASTREAM_TASK_LIMIT || maxBackfillTasks > MAX_DATASTREAM_TASK_LIMIT {
			return fmt.Errorf("maxConcurrentBackfillTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
		}
	}

	if dfCfg.JobName == "" {
		// Update names to have more info like dbname.
		jobName, err := utils.GenerateName("smt-dataflow-" + dbName)
		jobName = strings.Replace(jobName, "_", "-", -1)
		if err != nil {
			return fmt.Errorf("error generating stream name: %v", err)
		}
		streamingCfg.DataflowCfg.JobName = jobName
	}

	filePath := streamingCfg.TmpDir
	u, err := utils.ParseGCSFilePath(filePath)
	if err != nil {
		return fmt.Errorf("parseFilePath: unable to parse file path: %v", err)
	}
	// We update the TmpDir in case any '/' were added in ParseGCSFilePath().
	streamingCfg.TmpDir = u.String()
	bucketName := u.Host
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("failed to create GCS client")
	}
	defer client.Close()
	// The Get calls for Google Cloud Storage API have out of box retries.
	// Reference - https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations
	bucket := client.Bucket(bucketName)
	_, err = bucket.Attrs(ctx)
	if err != nil {
		return fmt.Errorf("bucket %s does not exist", bucketName)
	}

	// Verify GCS bucket tuning configs.
	if streamingCfg.GcsCfg.TtlInDaysSet {
		ttl := streamingCfg.GcsCfg.TtlInDays
		if ttl <= 0 {
			return fmt.Errorf("ttlInDays should be a positive integer")
		}
	}
	return nil
}