func main()

in reverse_replication/reverse-replication-runner.go [196:415]


func main() {
	fmt.Println("Setting up reverse replication pipeline...")

	setupGlobalFlags()
	flag.Parse()

	err := prechecks()
	if err != nil {
		fmt.Println("incorrect arguments passed:", err)
		return
	}

	dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, instanceId, dbName)

	ctx := context.Background()
	adminClient, _ := database.NewDatabaseAdminClient(ctx)
	spClient, err := spanner.NewClient(ctx, dbUri)

	client, err := storage.NewClient(ctx)
	if err != nil {
		fmt.Println("failed to create GCS client")
		return
	}
	defer client.Close()
	gcsBucketPath := strings.ReplaceAll(gcsPath, "gs://", "")
	splitPaths := strings.Split(gcsBucketPath, "/")
	gcsBucketName := splitPaths[0]
	bucket := client.Bucket(gcsBucketName)
	_, err = bucket.Attrs(ctx)
	if err != nil {
		fmt.Println("GCS Path does not exist, please create before running reverse replication:", gcsBucketName)
		return
	}

	if !skipChangeStreamCreation {

		err = validateOrCreateChangeStream(ctx, adminClient, spClient, dbUri)
		if err != nil {
			fmt.Println("Error in validating/creating changestream:", err)
			return
		}
	}

	if !skipMetadataDatabaseCreation {
		createDbReq := &adminpb.CreateDatabaseRequest{
			Parent:          fmt.Sprintf("projects/%s/instances/%s", spannerProjectId, metadataInstance),
			CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", metadataDatabase),
		}

		createDbOp, err := adminClient.CreateDatabase(ctx, createDbReq)
		if err != nil {
			if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
				fmt.Printf("Cannot submit create database request for metadata db: %v\n", err)
				return
			} else {
				fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
			}
		} else {
			if _, err := createDbOp.Wait(ctx); err != nil {
				if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
					fmt.Printf("create database request failed for metadata db: %v\n", err)
					return
				} else {
					fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
				}
			} else {
				fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
			}
		}
	}

	c, err := dataflow.NewFlexTemplatesClient(ctx)
	if err != nil {
		fmt.Printf("could not create flex template client: %v\n", err)
		return
	}
	defer c.Close()

	// If custom network is not selected, use public IP. Typical for internal testing flow.
	workerIpAddressConfig := dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
	if vpcNetwork != "" || vpcSubnetwork != "" {
		workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
		// If subnetwork is not provided, assume network has auto subnet configuration.
		if vpcSubnetwork != "" {
			vpcSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", vpcHostProjectId, dataflowRegion, vpcSubnetwork)
		}
	}

	runId := ""
	if runIdentifier != "" {
		runId = runIdentifier
	} else {
		runId = time.Now().UTC().Format(time.RFC3339)
		runId = strings.ReplaceAll(runId, ":", "-")
		runId = strings.ToLower(runId)
	}

	if jobsToLaunch == "both" || jobsToLaunch == "reader" {
		var additionalExpr []string

		if networkTags == "" {
			additionalExpr = []string{"use_runner_v2"}
		} else {
			additionalExpr = []string{"use_runner_v2", "use_network_tags=" + networkTags, "use_network_tags_for_flex_templates=" + networkTags}
		}

		readerParams := map[string]string{
			"changeStreamName":     changeStreamName,
			"instanceId":           instanceId,
			"databaseId":           dbName,
			"spannerProjectId":     spannerProjectId,
			"metadataInstance":     metadataInstance,
			"metadataDatabase":     metadataDatabase,
			"startTimestamp":       startTimestamp,
			"sessionFilePath":      sessionFilePath,
			"windowDuration":       windowDuration,
			"gcsOutputDirectory":   gcsPath,
			"filtrationMode":       filtrationMode,
			"sourceShardsFilePath": sourceShardsFilePath,
			"metadataTableSuffix":  metadataTableSuffix,
			"skipDirectoryName":    readerSkipDirectoryName,
			"runIdentifier":        runId,
			"runMode":              readerRunMode,
		}
		if readerShardingCustomJarPath != "" {
			readerParams["shardingCustomJarPath"] = readerShardingCustomJarPath //cant send empty since it expects GCS format
			readerParams["shardingCustomClassName"] = readerShardingCustomClassName
			readerParams["shardingCustomParameters"] = readerShardingCustomParameters
		}
		launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
			JobName:    fmt.Sprintf("%s-reader-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
			Template:   &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: spannerReaderTemplateLocation},
			Parameters: readerParams,
			Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
				NumWorkers:            int32(readerWorkers),
				AdditionalExperiments: additionalExpr,
				MachineType:           machineType,
				Network:               vpcNetwork,
				Subnetwork:            vpcSubnetwork,
				IpConfiguration:       workerIpAddressConfig,
				ServiceAccountEmail:   serviceAccountEmail,
				MaxWorkers:            int32(readerMaxWorkers),
			},
		}

		req := &dataflowpb.LaunchFlexTemplateRequest{
			ProjectId:       projectId,
			LaunchParameter: launchParameters,
			Location:        dataflowRegion,
		}
		fmt.Printf("\nGCLOUD CMD FOR READER JOB:\n%s\n\n", getGcloudCommand(req))

		readerJobResponse, err := c.LaunchFlexTemplate(ctx, req)
		if err != nil {
			fmt.Printf("unable to launch reader job: %v \n REQUEST BODY: %+v\n", err, req)
			return
		}
		fmt.Println("Launched reader job: ", readerJobResponse.Job)
	}

	if jobsToLaunch == "both" || jobsToLaunch == "writer" {

		var additionalExpr []string

		if networkTags != "" {

			additionalExpr = []string{"use_network_tags=" + networkTags, "use_network_tags_for_flex_templates=" + networkTags}
		}

		writerParams := map[string]string{
			"sourceShardsFilePath":   sourceShardsFilePath,
			"sessionFilePath":        sessionFilePath,
			"sourceDbTimezoneOffset": sourceDbTimezoneOffset,
			"metadataTableSuffix":    metadataTableSuffix,
			"GCSInputDirectoryPath":  gcsPath,
			"spannerProjectId":       spannerProjectId,
			"metadataInstance":       metadataInstance,
			"metadataDatabase":       metadataDatabase,
			"runMode":                writerRunMode,
			"runIdentifier":          runId,
		}

		if writerTransformationCustomJarPath != "" {
			writerParams["transformationJarPath"] = writerTransformationCustomJarPath
			writerParams["transformationClassName"] = writerTransformationCustomClassName
			writerParams["transformationCustomParameters"] = writerTransformationCustomParameters
			writerParams["writeFilteredEventsToGcs"] = strconv.FormatBool(writeFilteredEventsToGcs)
		}

		launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
			JobName:    fmt.Sprintf("%s-writer-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
			Template:   &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: sourceWriterTemplateLocation},
			Parameters: writerParams,
			Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
				NumWorkers:            int32(writerWorkers),
				AdditionalExperiments: additionalExpr,
				MachineType:           machineType,
				Network:               vpcNetwork,
				Subnetwork:            vpcSubnetwork,
				IpConfiguration:       workerIpAddressConfig,
				ServiceAccountEmail:   serviceAccountEmail,
			},
		}
		req := &dataflowpb.LaunchFlexTemplateRequest{
			ProjectId:       projectId,
			LaunchParameter: launchParameters,
			Location:        dataflowRegion,
		}
		fmt.Printf("\nGCLOUD CMD FOR WRITER JOB:\n%s\n\n", getGcloudCommand(req))

		writerJobResponse, err := c.LaunchFlexTemplate(ctx, req)
		if err != nil {
			fmt.Printf("unable to launch writer job: %v \n REQUEST BODY: %+v\n", err, req)
			return
		}

		fmt.Println("Launched writer job: ", writerJobResponse.Job)
	}

}