in reverse_replication/reverse-replication-runner.go [196:415]
func main() {
fmt.Println("Setting up reverse replication pipeline...")
setupGlobalFlags()
flag.Parse()
err := prechecks()
if err != nil {
fmt.Println("incorrect arguments passed:", err)
return
}
dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, instanceId, dbName)
ctx := context.Background()
adminClient, _ := database.NewDatabaseAdminClient(ctx)
spClient, err := spanner.NewClient(ctx, dbUri)
client, err := storage.NewClient(ctx)
if err != nil {
fmt.Println("failed to create GCS client")
return
}
defer client.Close()
gcsBucketPath := strings.ReplaceAll(gcsPath, "gs://", "")
splitPaths := strings.Split(gcsBucketPath, "/")
gcsBucketName := splitPaths[0]
bucket := client.Bucket(gcsBucketName)
_, err = bucket.Attrs(ctx)
if err != nil {
fmt.Println("GCS Path does not exist, please create before running reverse replication:", gcsBucketName)
return
}
if !skipChangeStreamCreation {
err = validateOrCreateChangeStream(ctx, adminClient, spClient, dbUri)
if err != nil {
fmt.Println("Error in validating/creating changestream:", err)
return
}
}
if !skipMetadataDatabaseCreation {
createDbReq := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", spannerProjectId, metadataInstance),
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", metadataDatabase),
}
createDbOp, err := adminClient.CreateDatabase(ctx, createDbReq)
if err != nil {
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
fmt.Printf("Cannot submit create database request for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
} else {
if _, err := createDbOp.Wait(ctx); err != nil {
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
fmt.Printf("create database request failed for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
} else {
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
}
}
c, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
fmt.Printf("could not create flex template client: %v\n", err)
return
}
defer c.Close()
// If custom network is not selected, use public IP. Typical for internal testing flow.
workerIpAddressConfig := dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
if vpcNetwork != "" || vpcSubnetwork != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
// If subnetwork is not provided, assume network has auto subnet configuration.
if vpcSubnetwork != "" {
vpcSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", vpcHostProjectId, dataflowRegion, vpcSubnetwork)
}
}
runId := ""
if runIdentifier != "" {
runId = runIdentifier
} else {
runId = time.Now().UTC().Format(time.RFC3339)
runId = strings.ReplaceAll(runId, ":", "-")
runId = strings.ToLower(runId)
}
if jobsToLaunch == "both" || jobsToLaunch == "reader" {
var additionalExpr []string
if networkTags == "" {
additionalExpr = []string{"use_runner_v2"}
} else {
additionalExpr = []string{"use_runner_v2", "use_network_tags=" + networkTags, "use_network_tags_for_flex_templates=" + networkTags}
}
readerParams := map[string]string{
"changeStreamName": changeStreamName,
"instanceId": instanceId,
"databaseId": dbName,
"spannerProjectId": spannerProjectId,
"metadataInstance": metadataInstance,
"metadataDatabase": metadataDatabase,
"startTimestamp": startTimestamp,
"sessionFilePath": sessionFilePath,
"windowDuration": windowDuration,
"gcsOutputDirectory": gcsPath,
"filtrationMode": filtrationMode,
"sourceShardsFilePath": sourceShardsFilePath,
"metadataTableSuffix": metadataTableSuffix,
"skipDirectoryName": readerSkipDirectoryName,
"runIdentifier": runId,
"runMode": readerRunMode,
}
if readerShardingCustomJarPath != "" {
readerParams["shardingCustomJarPath"] = readerShardingCustomJarPath //cant send empty since it expects GCS format
readerParams["shardingCustomClassName"] = readerShardingCustomClassName
readerParams["shardingCustomParameters"] = readerShardingCustomParameters
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-reader-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: spannerReaderTemplateLocation},
Parameters: readerParams,
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
NumWorkers: int32(readerWorkers),
AdditionalExperiments: additionalExpr,
MachineType: machineType,
Network: vpcNetwork,
Subnetwork: vpcSubnetwork,
IpConfiguration: workerIpAddressConfig,
ServiceAccountEmail: serviceAccountEmail,
MaxWorkers: int32(readerMaxWorkers),
},
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: projectId,
LaunchParameter: launchParameters,
Location: dataflowRegion,
}
fmt.Printf("\nGCLOUD CMD FOR READER JOB:\n%s\n\n", getGcloudCommand(req))
readerJobResponse, err := c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("unable to launch reader job: %v \n REQUEST BODY: %+v\n", err, req)
return
}
fmt.Println("Launched reader job: ", readerJobResponse.Job)
}
if jobsToLaunch == "both" || jobsToLaunch == "writer" {
var additionalExpr []string
if networkTags != "" {
additionalExpr = []string{"use_network_tags=" + networkTags, "use_network_tags_for_flex_templates=" + networkTags}
}
writerParams := map[string]string{
"sourceShardsFilePath": sourceShardsFilePath,
"sessionFilePath": sessionFilePath,
"sourceDbTimezoneOffset": sourceDbTimezoneOffset,
"metadataTableSuffix": metadataTableSuffix,
"GCSInputDirectoryPath": gcsPath,
"spannerProjectId": spannerProjectId,
"metadataInstance": metadataInstance,
"metadataDatabase": metadataDatabase,
"runMode": writerRunMode,
"runIdentifier": runId,
}
if writerTransformationCustomJarPath != "" {
writerParams["transformationJarPath"] = writerTransformationCustomJarPath
writerParams["transformationClassName"] = writerTransformationCustomClassName
writerParams["transformationCustomParameters"] = writerTransformationCustomParameters
writerParams["writeFilteredEventsToGcs"] = strconv.FormatBool(writeFilteredEventsToGcs)
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-writer-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: sourceWriterTemplateLocation},
Parameters: writerParams,
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
NumWorkers: int32(writerWorkers),
AdditionalExperiments: additionalExpr,
MachineType: machineType,
Network: vpcNetwork,
Subnetwork: vpcSubnetwork,
IpConfiguration: workerIpAddressConfig,
ServiceAccountEmail: serviceAccountEmail,
},
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: projectId,
LaunchParameter: launchParameters,
Location: dataflowRegion,
}
fmt.Printf("\nGCLOUD CMD FOR WRITER JOB:\n%s\n\n", getGcloudCommand(req))
writerJobResponse, err := c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("unable to launch writer job: %v \n REQUEST BODY: %+v\n", err, req)
return
}
fmt.Println("Launched writer job: ", writerJobResponse.Job)
}
}