in streaming/streaming.go [197:287]
func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, schemaDetails map[string]internal.SchemaDetails) error {
dsCfg := streamingCfg.DatastreamCfg
if dsCfg.StreamLocation == "" {
return fmt.Errorf("please specify DatastreamCfg.StreamLocation in the streaming config")
}
dfCfg := streamingCfg.DataflowCfg
if dfCfg.Location == "" {
return fmt.Errorf("please specify the Location under DataflowCfg in the streaming config")
}
// If both ID and Display name are empty, generate a new one for both.
// If either is present, assign it to the other one.
if dsCfg.StreamId == "" && dsCfg.StreamDisplayName == "" {
// TODO: Update names to have more info like dbname.
streamId, err := utils.GenerateName("smt-stream-" + dbName)
streamId = strings.Replace(streamId, "_", "-", -1)
if err != nil {
return fmt.Errorf("error generating stream name: %v", err)
}
streamingCfg.DatastreamCfg.StreamId = streamId
streamingCfg.DatastreamCfg.StreamDisplayName = streamId
} else if dsCfg.StreamId == "" {
streamingCfg.DatastreamCfg.StreamId = streamingCfg.DatastreamCfg.StreamDisplayName
} else if dsCfg.StreamDisplayName == "" {
streamingCfg.DatastreamCfg.StreamDisplayName = streamingCfg.DatastreamCfg.StreamId
}
streamingCfg.DatastreamCfg.SchemaDetails = schemaDetails
if dsCfg.MaxConcurrentCdcTasks != "" {
intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentCdcTasks, 10, 64)
if err != nil {
return fmt.Errorf("could not parse maxConcurrentCdcTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentCdcTasks)
}
maxCdcTasks = int32(intVal)
if maxCdcTasks < MIN_DATASTREAM_TASK_LIMIT || maxCdcTasks > MAX_DATASTREAM_TASK_LIMIT {
return fmt.Errorf("maxConcurrentCdcTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
}
}
if dsCfg.MaxConcurrentBackfillTasks != "" {
intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentBackfillTasks, 10, 64)
if err != nil {
return fmt.Errorf("could not parse maxConcurrentBackfillTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentBackfillTasks)
}
maxBackfillTasks = int32(intVal)
if maxBackfillTasks < MIN_DATASTREAM_TASK_LIMIT || maxBackfillTasks > MAX_DATASTREAM_TASK_LIMIT {
return fmt.Errorf("maxConcurrentBackfillTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
}
}
if dfCfg.JobName == "" {
// Update names to have more info like dbname.
jobName, err := utils.GenerateName("smt-dataflow-" + dbName)
jobName = strings.Replace(jobName, "_", "-", -1)
if err != nil {
return fmt.Errorf("error generating stream name: %v", err)
}
streamingCfg.DataflowCfg.JobName = jobName
}
filePath := streamingCfg.TmpDir
u, err := utils.ParseGCSFilePath(filePath)
if err != nil {
return fmt.Errorf("parseFilePath: unable to parse file path: %v", err)
}
// We update the TmpDir in case any '/' were added in ParseGCSFilePath().
streamingCfg.TmpDir = u.String()
bucketName := u.Host
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to create GCS client")
}
defer client.Close()
// The Get calls for Google Cloud Storage API have out of box retries.
// Reference - https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations
bucket := client.Bucket(bucketName)
_, err = bucket.Attrs(ctx)
if err != nil {
return fmt.Errorf("bucket %s does not exist", bucketName)
}
// Verify GCS bucket tuning configs.
if streamingCfg.GcsCfg.TtlInDaysSet {
ttl := streamingCfg.GcsCfg.TtlInDays
if ttl <= 0 {
return fmt.Errorf("ttlInDays should be a positive integer")
}
}
return nil
}