in streaming/streaming.go [647:804]
func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
spannerProjectId, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{})
dataflowCfg := streamingCfg.DataflowCfg
datastreamCfg := streamingCfg.DatastreamCfg
// Rate limit this function to match DataFlow createJob Quota.
DATA_FLOW_RL.Take()
fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", migrationProjectId, "-", dataflowCfg.Location)
c, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not create flex template client: %v", err)
}
defer c.Close()
fmt.Println("Created flex template client...")
//Creating datastream client to fetch the gcs bucket using target profile.
dsClient, err := datastream.NewClient(ctx)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()
// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", migrationProjectId, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err)
}
gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
inputFilePattern := "gs://" + gcsProfile.Bucket + gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix
if inputFilePattern[len(inputFilePattern)-1] != '/' {
inputFilePattern = inputFilePattern + "/"
}
fmt.Println("Reading files from datastream destination ", inputFilePattern)
// Initiate runtime environment flags and overrides.
var (
dataflowProjectId = migrationProjectId
dataflowVpcHostProjectId = migrationProjectId
gcsTemplatePath = utils.GetDataflowTemplatePath()
dataflowSubnetwork = ""
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
dataflowUserLabels = make(map[string]string)
machineType = "n1-standard-2"
)
// If project override present, use that otherwise default to Migration project. Useful when customers want to run Dataflow in separate project.
if dataflowCfg.ProjectId != "" {
dataflowProjectId = dataflowCfg.ProjectId
}
// If VPC Host project override present, use that otherwise default to Migration project.
if dataflowCfg.VpcHostProjectId != "" {
dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId
}
if dataflowCfg.GcsTemplatePath != "" {
gcsTemplatePath = dataflowCfg.GcsTemplatePath
}
// If either network or subnetwork is specified, set IpConfig to private.
if dataflowCfg.Network != "" || dataflowCfg.Subnetwork != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
if dataflowCfg.Subnetwork != "" {
dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowVpcHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
}
}
if dataflowCfg.AdditionalUserLabels != "" {
err = json.Unmarshal([]byte(dataflowCfg.AdditionalUserLabels), &dataflowUserLabels)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not unmarshal AdditionalUserLabels json %s : error = %v", dataflowCfg.AdditionalUserLabels, err)
}
}
if dataflowCfg.MaxWorkers != "" {
intVal, err := strconv.ParseInt(dataflowCfg.MaxWorkers, 10, 64)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not parse MaxWorkers parameter %s, please provide a positive integer as input", dataflowCfg.MaxWorkers)
}
maxWorkers = int32(intVal)
if maxWorkers < MIN_WORKER_LIMIT || maxWorkers > MAX_WORKER_LIMIT {
return internal.DataflowOutput{}, fmt.Errorf("maxWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
}
}
if dataflowCfg.NumWorkers != "" {
intVal, err := strconv.ParseInt(dataflowCfg.NumWorkers, 10, 64)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not parse NumWorkers parameter %s, please provide a positive integer as input", dataflowCfg.NumWorkers)
}
numWorkers = int32(intVal)
if numWorkers < MIN_WORKER_LIMIT || numWorkers > MAX_WORKER_LIMIT {
return internal.DataflowOutput{}, fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
}
}
if dataflowCfg.MachineType != "" {
machineType = dataflowCfg.MachineType
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
Parameters: map[string]string{
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"projectId": spannerProjectId,
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
"gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.PubsubCfg.SubscriptionId),
"dlqGcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.DlqPubsubCfg.SubscriptionId),
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: maxWorkers,
NumWorkers: numWorkers,
ServiceAccountEmail: dataflowCfg.ServiceAccountEmail,
AutoscalingAlgorithm: 2, // 2 corresponds to AUTOSCALING_ALGORITHM_BASIC
EnableStreamingEngine: true,
Network: dataflowCfg.Network,
Subnetwork: dataflowSubnetwork,
IpConfiguration: workerIpAddressConfig,
MachineType: machineType,
AdditionalUserLabels: dataflowUserLabels,
KmsKeyName: dataflowCfg.KmsKeyName,
},
}
if dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath != "" {
launchParameters.Parameters["transformationJarPath"] = dataflowCfg.CustomJarPath
launchParameters.Parameters["transformationClassName"] = dataflowCfg.CustomClassName
launchParameters.Parameters["transformationCustomParameters"] = dataflowCfg.CustomParameter
launchParameters.Parameters["filteredEventsDirectory"] = utils.ConcatDirectoryPath(inputFilePattern, "filteredEvents")
} else if (dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath == "") || (dataflowCfg.CustomClassName == "" && dataflowCfg.CustomJarPath != "") {
return internal.DataflowOutput{}, fmt.Errorf("specify both the custom class name and custom jar GCS path, or specify neither")
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: dataflowProjectId,
LaunchParameter: launchParameters,
Location: dataflowCfg.Location,
}
fmt.Println("Created flex template request body...")
// LaunchFlexTemplate does not have out of box retries or any direct documentation on how
// to make the call idempotent.
// Ref - https://github.com/googleapis/googleapis/blob/master/google/dataflow/v1beta3/dataflow_grpc_service_config.json
// TODO explore retries.
respDf, err := c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("flexTemplateRequest: %+v\n", req)
return internal.DataflowOutput{}, fmt.Errorf("unable to launch template: %v", err)
}
// Refactor to use accessor return value.
gcloudDfCmd := dataflowaccessor.GetGcloudDataflowCommandFromRequest(req)
logger.Log.Debug(fmt.Sprintf("\nEquivalent gCloud command for job %s:\n%s\n\n", req.LaunchParameter.JobName, gcloudDfCmd))
return internal.DataflowOutput{JobID: respDf.Job.Id, GCloudCmd: gcloudDfCmd}, nil
}