func LaunchDataflowJob()

in streaming/streaming.go [647:804]


func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
	spannerProjectId, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{})
	dataflowCfg := streamingCfg.DataflowCfg
	datastreamCfg := streamingCfg.DatastreamCfg

	// Rate limit this function to match DataFlow createJob Quota.
	DATA_FLOW_RL.Take()

	fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", migrationProjectId, "-", dataflowCfg.Location)

	c, err := dataflow.NewFlexTemplatesClient(ctx)
	if err != nil {
		return internal.DataflowOutput{}, fmt.Errorf("could not create flex template client: %v", err)
	}
	defer c.Close()
	fmt.Println("Created flex template client...")

	//Creating datastream client to fetch the gcs bucket using target profile.
	dsClient, err := datastream.NewClient(ctx)
	if err != nil {
		return internal.DataflowOutput{}, fmt.Errorf("datastream client can not be created: %v", err)
	}
	defer dsClient.Close()

	// Fetch the GCS path from the destination connection profile.
	dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", migrationProjectId, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
	res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
	if err != nil {
		return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err)
	}
	gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
	inputFilePattern := "gs://" + gcsProfile.Bucket + gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix
	if inputFilePattern[len(inputFilePattern)-1] != '/' {
		inputFilePattern = inputFilePattern + "/"
	}
	fmt.Println("Reading files from datastream destination ", inputFilePattern)

	// Initiate runtime environment flags and overrides.
	var (
		dataflowProjectId        = migrationProjectId
		dataflowVpcHostProjectId = migrationProjectId
		gcsTemplatePath          = utils.GetDataflowTemplatePath()
		dataflowSubnetwork       = ""
		workerIpAddressConfig    = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
		dataflowUserLabels       = make(map[string]string)
		machineType              = "n1-standard-2"
	)
	// If project override present, use that otherwise default to Migration project. Useful when customers want to run Dataflow in separate project.
	if dataflowCfg.ProjectId != "" {
		dataflowProjectId = dataflowCfg.ProjectId
	}
	// If VPC Host project override present, use that otherwise default to Migration project.
	if dataflowCfg.VpcHostProjectId != "" {
		dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId
	}
	if dataflowCfg.GcsTemplatePath != "" {
		gcsTemplatePath = dataflowCfg.GcsTemplatePath
	}

	// If either network or subnetwork is specified, set IpConfig to private.
	if dataflowCfg.Network != "" || dataflowCfg.Subnetwork != "" {
		workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
		if dataflowCfg.Subnetwork != "" {
			dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowVpcHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
		}
	}

	if dataflowCfg.AdditionalUserLabels != "" {
		err = json.Unmarshal([]byte(dataflowCfg.AdditionalUserLabels), &dataflowUserLabels)
		if err != nil {
			return internal.DataflowOutput{}, fmt.Errorf("could not unmarshal AdditionalUserLabels json %s : error = %v", dataflowCfg.AdditionalUserLabels, err)
		}
	}

	if dataflowCfg.MaxWorkers != "" {
		intVal, err := strconv.ParseInt(dataflowCfg.MaxWorkers, 10, 64)
		if err != nil {
			return internal.DataflowOutput{}, fmt.Errorf("could not parse MaxWorkers parameter %s, please provide a positive integer as input", dataflowCfg.MaxWorkers)
		}
		maxWorkers = int32(intVal)
		if maxWorkers < MIN_WORKER_LIMIT || maxWorkers > MAX_WORKER_LIMIT {
			return internal.DataflowOutput{}, fmt.Errorf("maxWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
		}
	}
	if dataflowCfg.NumWorkers != "" {
		intVal, err := strconv.ParseInt(dataflowCfg.NumWorkers, 10, 64)
		if err != nil {
			return internal.DataflowOutput{}, fmt.Errorf("could not parse NumWorkers parameter %s, please provide a positive integer as input", dataflowCfg.NumWorkers)
		}
		numWorkers = int32(intVal)
		if numWorkers < MIN_WORKER_LIMIT || numWorkers > MAX_WORKER_LIMIT {
			return internal.DataflowOutput{}, fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
		}
	}

	if dataflowCfg.MachineType != "" {
		machineType = dataflowCfg.MachineType
	}

	launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
		JobName:  dataflowCfg.JobName,
		Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
		Parameters: map[string]string{
			"streamName":                    fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
			"projectId":                     spannerProjectId,
			"instanceId":                    instance,
			"databaseId":                    dbName,
			"sessionFilePath":               streamingCfg.TmpDir + "session.json",
			"deadLetterQueueDirectory":      inputFilePattern + "dlq",
			"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
			"gcsPubSubSubscription":         fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.PubsubCfg.SubscriptionId),
			"dlqGcsPubSubSubscription":      fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.DlqPubsubCfg.SubscriptionId),
		},
		Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
			MaxWorkers:            maxWorkers,
			NumWorkers:            numWorkers,
			ServiceAccountEmail:   dataflowCfg.ServiceAccountEmail,
			AutoscalingAlgorithm:  2, // 2 corresponds to AUTOSCALING_ALGORITHM_BASIC
			EnableStreamingEngine: true,
			Network:               dataflowCfg.Network,
			Subnetwork:            dataflowSubnetwork,
			IpConfiguration:       workerIpAddressConfig,
			MachineType:           machineType,
			AdditionalUserLabels:  dataflowUserLabels,
			KmsKeyName:            dataflowCfg.KmsKeyName,
		},
	}

	if dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath != "" {
		launchParameters.Parameters["transformationJarPath"] = dataflowCfg.CustomJarPath
		launchParameters.Parameters["transformationClassName"] = dataflowCfg.CustomClassName
		launchParameters.Parameters["transformationCustomParameters"] = dataflowCfg.CustomParameter
		launchParameters.Parameters["filteredEventsDirectory"] = utils.ConcatDirectoryPath(inputFilePattern, "filteredEvents")
	} else if (dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath == "") || (dataflowCfg.CustomClassName == "" && dataflowCfg.CustomJarPath != "") {
		return internal.DataflowOutput{}, fmt.Errorf("specify both the custom class name and custom jar GCS path, or specify neither")
	}

	req := &dataflowpb.LaunchFlexTemplateRequest{
		ProjectId:       dataflowProjectId,
		LaunchParameter: launchParameters,
		Location:        dataflowCfg.Location,
	}
	fmt.Println("Created flex template request body...")

	// LaunchFlexTemplate does not have out of box retries or any direct documentation on how
	// to make the call idempotent.
	// Ref - https://github.com/googleapis/googleapis/blob/master/google/dataflow/v1beta3/dataflow_grpc_service_config.json
	// TODO explore retries.
	respDf, err := c.LaunchFlexTemplate(ctx, req)
	if err != nil {
		fmt.Printf("flexTemplateRequest: %+v\n", req)
		return internal.DataflowOutput{}, fmt.Errorf("unable to launch template: %v", err)
	}
	// Refactor to use accessor return value.
	gcloudDfCmd := dataflowaccessor.GetGcloudDataflowCommandFromRequest(req)
	logger.Log.Debug(fmt.Sprintf("\nEquivalent gCloud command for job %s:\n%s\n\n", req.LaunchParameter.JobName, gcloudDfCmd))
	return internal.DataflowOutput{JobID: respDf.Job.Id, GCloudCmd: gcloudDfCmd}, nil
}