streaming/streaming.go (785 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package streaming
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync"
"time"
dataflow "cloud.google.com/go/dataflow/apiv1beta3"
datastream "cloud.google.com/go/datastream/apiv1"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
datastreampb "google.golang.org/genproto/googleapis/cloud/datastream/v1"
dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
"google.golang.org/protobuf/types/known/fieldmaskpb"
resourcemanager "cloud.google.com/go/resourcemanager/apiv3"
resourcemanagerpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb"
storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage"
dataflowaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/dataflow"
storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"go.uber.org/ratelimit"
"google.golang.org/grpc/codes"
)
// Uber Note on API retries:
// This file makes a lot of API calls.
// Many of the Google Cloud API calls have out of box retries for a vetted list of errorcodes (typically `UNAVAILABLE`)
// If that's the case for the given call, please add a comment. In case we face an issue in integration test and it is needed
// to add additional retry error codes for that call, please use the gax retry options for the call.
// There are cases where the retry is not out of box. For such calls:
// 1. Read documentation for how to make the call idempotent, for example in some cases, special fields (like RequestId) need to be set.
// 2. Add retry for `UNAVIALBE` followed by any other retriable error you might have seen in testing only if you are sure your call is idempotent.
// It might be good to run the PR at some scale (depnds on each case) to avoid surprises.
// If any Rretry related exploration is not immediately feasible, please do add a TODO comment in the code.
var (
// Default value for max concurrent backfill tasks in Datastream. Datastream resorts to its default value for 0.
maxCdcTasks int32 = 5
// Default value for max concurrent backfill tasks in Datastream.
maxBackfillTasks int32 = 50
// Min allowed value for max concurrent backfill/CDC tasks in Datastream. 0 value results in the default value being used and hence, is valid.
MIN_DATASTREAM_TASK_LIMIT int32 = 0
// Max allowed value for max concurrent backfill/CDC tasks in Datastream.
MAX_DATASTREAM_TASK_LIMIT int32 = 50
// Default value for maxWorkers.
maxWorkers int32 = 50
// Default value for NumWorkers.
numWorkers int32 = 1
// Max allowed value for maxWorkers and numWorkers.
MAX_WORKER_LIMIT int32 = 1000
// Min allowed value for maxWorkers and numWorkers.
MIN_WORKER_LIMIT int32 = 1
DEFAULT_DATASTREAM_CLIENT_BACKOFF_BASE_DELAY time.Duration = 1.0 * time.Second
DEFAULT_DATASTREAM_CLIENT_BACKOFF_MAX_DELAY time.Duration = 900 * time.Second
DEFAULT_DATASTREAM_CLIENT_BACKOFF_MULTIPLIER float64 = 1.6
DEFAULT_DATASTREAM_RETRY_CODES []codes.Code = []codes.Code{
codes.DeadlineExceeded,
codes.Unavailable,
codes.ResourceExhausted,
codes.Unknown,
}
DEFAULT_DATASTREAM_LRO_POLL_BASE_DELAY time.Duration = 1.0 * time.Minute
DEFAULT_DATASTREAM_LRO_POLL_MAX_DELAY time.Duration = 15 * time.Minute
DEFAULT_DATASTREAM_LRO_POLL_MAX_ELAPSED_TIME time.Duration = 30 * time.Minute
DEFAULT_DATASTREAM_LRO_POLL_MULTIPLIER float64 = 1.6
// DataStream by default provides 20 API calls per second.
// Each launch operation calls datastream twice, ignoring `op.wait`, hence keeping it at 10 per second.
DEFAULT_DATASTREAM_LAUNCH_RATE_PER_SEC int = 10
// TODO(vardhanvthigle): Caliberate this.
// Keeping it less for now since the dataFlow launch operation makes outbound calls to various clients.
// Keeping it at 1 per second will not impact the actual time it takes to provision resources for a large
// scale migration, as that depends a lot on actual time a batch of 20 (current parallelization in code) tkaes
// to complete it's provisioning which includes waiting for various operations.
DEFAULT_DATAFLOW_LAUNCH_RATE_PER_SEC int = 1
// Rate Limiters
// A coarse delay based rate limiting for launching datastream.
DATA_STREAM_RL ratelimit.Limiter = ratelimit.New(DEFAULT_DATASTREAM_LAUNCH_RATE_PER_SEC)
// A coarse delay based rate limiting for launching DataFlow.
DATA_FLOW_RL ratelimit.Limiter = ratelimit.New(DEFAULT_DATAFLOW_LAUNCH_RATE_PER_SEC)
)
type SrcConnCfg struct {
Name string
Location string
}
type DstConnCfg struct {
Name string
Location string
Prefix string
}
type DatastreamCfg struct {
StreamId string `json:"streamId"`
StreamLocation string `json:"streamLocation"`
StreamDisplayName string `json:"streamDisplayName"`
SourceConnectionConfig SrcConnCfg `json:"sourceConnectionConfig"`
DestinationConnectionConfig DstConnCfg `json:"destinationConnectionConfig"`
Properties string `json:"properties"`
SchemaDetails map[string]internal.SchemaDetails `json:"-"`
MaxConcurrentBackfillTasks string `json:"maxConcurrentBackfillTasks"`
MaxConcurrentCdcTasks string `json:"maxConcurrentCdcTasks"`
}
type GcsCfg struct {
TtlInDays int64 `json:"ttlInDays"`
TtlInDaysSet bool `json:"ttlInDaysSet"`
}
type DataflowCfg struct {
ProjectId string `json:"projectId"`
JobName string `json:"jobName"`
Location string `json:"location"`
VpcHostProjectId string `json:"hostProjectId"`
Network string `json:"network"`
Subnetwork string `json:"subnetwork"`
MaxWorkers string `json:"maxWorkers"`
NumWorkers string `json:"numWorkers"`
ServiceAccountEmail string `json:"serviceAccountEmail"`
MachineType string `json:"machineType"`
AdditionalUserLabels string `json:"additionalUserLabels"`
KmsKeyName string `json:"kmsKeyName"`
GcsTemplatePath string `json:"gcsTemplatePath"`
DbNameToShardIdMap map[string]string `json:"dbNameToShardIdMap"`
CustomJarPath string `json:"customJarPath"`
CustomClassName string `json:"customClassName"`
CustomParameter string `json:"customParameter"`
}
type StreamingCfg struct {
DatastreamCfg DatastreamCfg `json:"datastreamCfg"`
GcsCfg GcsCfg `json:"gcsCfg"`
DataflowCfg DataflowCfg `json:"dataflowCfg"`
TmpDir string `json:"tmpDir"`
PubsubCfg internal.PubsubResources `json:"pubsubCfg"`
DlqPubsubCfg internal.PubsubResources `json:"dlqPubsubCfg"`
DataShardId string `json:"dataShardId"`
}
// Returns the retry error codes and backoff policy to the GCP client retry logic.
func dataStreamGaxRetrier() gax.Retryer {
return gax.OnCodes(DEFAULT_DATASTREAM_RETRY_CODES, gax.Backoff{
Initial: DEFAULT_DATASTREAM_CLIENT_BACKOFF_BASE_DELAY,
Max: DEFAULT_DATASTREAM_CLIENT_BACKOFF_MAX_DELAY,
Multiplier: DEFAULT_DATASTREAM_CLIENT_BACKOFF_MULTIPLIER,
})
}
// Returns ExponentialBackoff Operator for DataStream Update Stream Retries across LRO poll.
func getUpdateDataStreamLRORetryBackoff() *backoff.ExponentialBackOff {
dataStreamUpdateBackoff := &backoff.ExponentialBackOff{
InitialInterval: DEFAULT_DATASTREAM_LRO_POLL_BASE_DELAY,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: DEFAULT_DATASTREAM_LRO_POLL_MULTIPLIER,
MaxInterval: DEFAULT_DATASTREAM_LRO_POLL_MAX_DELAY,
MaxElapsedTime: DEFAULT_DATASTREAM_LRO_POLL_MAX_ELAPSED_TIME,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
dataStreamUpdateBackoff.Reset()
return dataStreamUpdateBackoff
}
// VerifyAndUpdateCfg checks the fields and errors out if certain fields are empty.
// It then auto-populates certain empty fields like StreamId and Dataflow JobName.
func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, schemaDetails map[string]internal.SchemaDetails) error {
dsCfg := streamingCfg.DatastreamCfg
if dsCfg.StreamLocation == "" {
return fmt.Errorf("please specify DatastreamCfg.StreamLocation in the streaming config")
}
dfCfg := streamingCfg.DataflowCfg
if dfCfg.Location == "" {
return fmt.Errorf("please specify the Location under DataflowCfg in the streaming config")
}
// If both ID and Display name are empty, generate a new one for both.
// If either is present, assign it to the other one.
if dsCfg.StreamId == "" && dsCfg.StreamDisplayName == "" {
// TODO: Update names to have more info like dbname.
streamId, err := utils.GenerateName("smt-stream-" + dbName)
streamId = strings.Replace(streamId, "_", "-", -1)
if err != nil {
return fmt.Errorf("error generating stream name: %v", err)
}
streamingCfg.DatastreamCfg.StreamId = streamId
streamingCfg.DatastreamCfg.StreamDisplayName = streamId
} else if dsCfg.StreamId == "" {
streamingCfg.DatastreamCfg.StreamId = streamingCfg.DatastreamCfg.StreamDisplayName
} else if dsCfg.StreamDisplayName == "" {
streamingCfg.DatastreamCfg.StreamDisplayName = streamingCfg.DatastreamCfg.StreamId
}
streamingCfg.DatastreamCfg.SchemaDetails = schemaDetails
if dsCfg.MaxConcurrentCdcTasks != "" {
intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentCdcTasks, 10, 64)
if err != nil {
return fmt.Errorf("could not parse maxConcurrentCdcTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentCdcTasks)
}
maxCdcTasks = int32(intVal)
if maxCdcTasks < MIN_DATASTREAM_TASK_LIMIT || maxCdcTasks > MAX_DATASTREAM_TASK_LIMIT {
return fmt.Errorf("maxConcurrentCdcTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
}
}
if dsCfg.MaxConcurrentBackfillTasks != "" {
intVal, err := strconv.ParseInt(dsCfg.MaxConcurrentBackfillTasks, 10, 64)
if err != nil {
return fmt.Errorf("could not parse maxConcurrentBackfillTasks parameter %s, please provide a positive integer as input", dsCfg.MaxConcurrentBackfillTasks)
}
maxBackfillTasks = int32(intVal)
if maxBackfillTasks < MIN_DATASTREAM_TASK_LIMIT || maxBackfillTasks > MAX_DATASTREAM_TASK_LIMIT {
return fmt.Errorf("maxConcurrentBackfillTasks should lie in the range [%d, %d]", MIN_DATASTREAM_TASK_LIMIT, MAX_DATASTREAM_TASK_LIMIT)
}
}
if dfCfg.JobName == "" {
// Update names to have more info like dbname.
jobName, err := utils.GenerateName("smt-dataflow-" + dbName)
jobName = strings.Replace(jobName, "_", "-", -1)
if err != nil {
return fmt.Errorf("error generating stream name: %v", err)
}
streamingCfg.DataflowCfg.JobName = jobName
}
filePath := streamingCfg.TmpDir
u, err := utils.ParseGCSFilePath(filePath)
if err != nil {
return fmt.Errorf("parseFilePath: unable to parse file path: %v", err)
}
// We update the TmpDir in case any '/' were added in ParseGCSFilePath().
streamingCfg.TmpDir = u.String()
bucketName := u.Host
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to create GCS client")
}
defer client.Close()
// The Get calls for Google Cloud Storage API have out of box retries.
// Reference - https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations
bucket := client.Bucket(bucketName)
_, err = bucket.Attrs(ctx)
if err != nil {
return fmt.Errorf("bucket %s does not exist", bucketName)
}
// Verify GCS bucket tuning configs.
if streamingCfg.GcsCfg.TtlInDaysSet {
ttl := streamingCfg.GcsCfg.TtlInDays
if ttl <= 0 {
return fmt.Errorf("ttlInDays should be a positive integer")
}
}
return nil
}
// ReadStreamingConfig reads the file and unmarshalls it into the StreamingCfg struct.
func ReadStreamingConfig(file, dbName string, schemaDetails map[string]internal.SchemaDetails) (StreamingCfg, error) {
streamingCfg := StreamingCfg{}
cfgFile, err := ioutil.ReadFile(file)
if err != nil {
return streamingCfg, fmt.Errorf("can't read streaming config file due to: %v", err)
}
err = json.Unmarshal(cfgFile, &streamingCfg)
if err != nil {
return streamingCfg, fmt.Errorf("unable to unmarshall json due to: %v", err)
}
err = VerifyAndUpdateCfg(&streamingCfg, dbName, schemaDetails)
if err != nil {
return streamingCfg, fmt.Errorf("streaming config is incomplete: %v", err)
}
return streamingCfg, nil
}
// dbName is the name of the database to be migrated.
// tabeList is the common list of tables that need to be migrated from each database
func getMysqlSourceStreamConfig(dbList []profiles.LogicalShard, datastreamCfg DatastreamCfg) (*datastreampb.SourceConfig_MysqlSourceConfig, error) {
schemaDetails := datastreamCfg.SchemaDetails
mysqlTables := []*datastreampb.MysqlTable{}
for _, tableList := range schemaDetails {
for _, table := range tableList.TableDetails {
includeTable := &datastreampb.MysqlTable{
Table: table.TableName,
}
mysqlTables = append(mysqlTables, includeTable)
}
}
includeDbList := []*datastreampb.MysqlDatabase{}
for _, db := range dbList {
//create include db object
includeDb := &datastreampb.MysqlDatabase{
Database: db.DbName,
MysqlTables: mysqlTables,
}
includeDbList = append(includeDbList, includeDb)
}
//TODO: Clean up fmt.Printf logs and replace them with zap logger.
fmt.Printf("Include DB List for datastream: %+v\n", includeDbList)
mysqlSrcCfg := &datastreampb.MysqlSourceConfig{
IncludeObjects: &datastreampb.MysqlRdbms{MysqlDatabases: includeDbList},
MaxConcurrentBackfillTasks: maxBackfillTasks,
MaxConcurrentCdcTasks: maxCdcTasks,
}
return &datastreampb.SourceConfig_MysqlSourceConfig{MysqlSourceConfig: mysqlSrcCfg}, nil
}
func getOracleSourceStreamConfig(dbName string, datastreamCfg DatastreamCfg) (*datastreampb.SourceConfig_OracleSourceConfig, error) {
oracleTables := []*datastreampb.OracleTable{}
for _, tableList := range datastreamCfg.SchemaDetails {
for _, table := range tableList.TableDetails {
includeTable := &datastreampb.OracleTable{
Table: table.TableName,
}
oracleTables = append(oracleTables, includeTable)
}
}
oracledb := &datastreampb.OracleSchema{
Schema: dbName,
OracleTables: oracleTables,
}
oracleSrcCfg := &datastreampb.OracleSourceConfig{
IncludeObjects: &datastreampb.OracleRdbms{OracleSchemas: []*datastreampb.OracleSchema{oracledb}},
MaxConcurrentBackfillTasks: maxBackfillTasks,
MaxConcurrentCdcTasks: maxCdcTasks,
}
return &datastreampb.SourceConfig_OracleSourceConfig{OracleSourceConfig: oracleSrcCfg}, nil
}
func getPostgreSQLSourceStreamConfig(datastreamCfg DatastreamCfg) (*datastreampb.SourceConfig_PostgresqlSourceConfig, error) {
properties := datastreamCfg.Properties
params, err := profiles.ParseMap(properties)
if err != nil {
return nil, fmt.Errorf("could not parse properties: %v", err)
}
postgreSQLSchema := []*datastreampb.PostgresqlSchema{}
for schema, tableList := range datastreamCfg.SchemaDetails {
postgreSQLTables := []*datastreampb.PostgresqlTable{}
for _, table := range tableList.TableDetails {
var includeTable *datastreampb.PostgresqlTable
if schema == "public" {
includeTable = &datastreampb.PostgresqlTable{
Table: table.TableName,
}
} else {
includeTable = &datastreampb.PostgresqlTable{
Table: strings.TrimPrefix(table.TableName, schema+"."),
}
}
postgreSQLTables = append(postgreSQLTables, includeTable)
}
includeSchema := &datastreampb.PostgresqlSchema{
Schema: schema,
PostgresqlTables: postgreSQLTables,
}
postgreSQLSchema = append(postgreSQLSchema, includeSchema)
}
replicationSlot, replicationSlotExists := params["replicationSlot"]
publication, publicationExists := params["publication"]
if !replicationSlotExists || !publicationExists {
return nil, fmt.Errorf("replication slot or publication not specified")
}
postgresSrcCfg := &datastreampb.PostgresqlSourceConfig{
IncludeObjects: &datastreampb.PostgresqlRdbms{PostgresqlSchemas: postgreSQLSchema},
ReplicationSlot: replicationSlot,
Publication: publication,
MaxConcurrentBackfillTasks: maxBackfillTasks,
}
return &datastreampb.SourceConfig_PostgresqlSourceConfig{PostgresqlSourceConfig: postgresSrcCfg}, nil
}
func getSourceStreamConfig(srcCfg *datastreampb.SourceConfig, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, datastreamCfg DatastreamCfg) error {
var err error = nil
switch sourceProfile.Driver {
case constants.MYSQL:
// For MySQL, it supports sharded migrations and batching databases in a physical machine into a single
// Datastream, so dbList is passed.
srcCfg.SourceStreamConfig, err = getMysqlSourceStreamConfig(dbList, datastreamCfg)
return err
case constants.ORACLE:
// For Oracle, no sharded migrations or db batching support, so the dbList always contains only one element.
srcCfg.SourceStreamConfig, err = getOracleSourceStreamConfig(dbList[0].DbName, datastreamCfg)
return err
case constants.POSTGRES:
// For Postgres, tables need to be configured at the schema level, which will require more information List<Dbs> and Map<Schema, List<Tables>>
// instead of List<Dbs> and List<Tables>. Becuase of this we do not configure postgres datastream at individual table level currently.
srcCfg.SourceStreamConfig, err = getPostgreSQLSourceStreamConfig(datastreamCfg)
return err
default:
return fmt.Errorf("only MySQL, Oracle and PostgreSQL are supported as source streams")
}
}
func CreatePubsubResources(ctx context.Context, projectID string, datastreamDestinationConnCfg DstConnCfg, dbName string, pubsubDestination string) (*internal.PubsubResources, error) {
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("pubsub client can not be created: %v", err)
}
defer pubsubClient.Close()
// Create pubsub topic and subscription
pubsubCfg, err := createPubsubTopicAndSubscription(ctx, pubsubClient, dbName, "-"+pubsubDestination)
if err != nil {
logger.Log.Error(fmt.Sprintf("Could not create pubsub resources. Some permissions missing. Please check https://googlecloudplatform.github.io/spanner-migration-tool/permissions.html for required pubsub permissions. error=%v", err))
return nil, err
}
// Fetch the created target profile and get the target gcs bucket name and path.
// Then create notification for the target bucket.
// Creating datastream client to fetch target profile.
dsClient, err := datastream.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()
bucketName, prefix, err := FetchTargetBucketAndPath(ctx, dsClient, projectID, datastreamDestinationConnCfg, pubsubDestination)
if err != nil {
return nil, err
}
// Create pubsub notification on the target gcs path
storageClient, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("GCS client can not be created: %v", err)
}
defer storageClient.Close()
notificationID, err := createNotificationOnBucket(ctx, storageClient, projectID, pubsubCfg.TopicId, bucketName, prefix)
if err != nil {
logger.Log.Error(fmt.Sprintf("Could not create pubsub resources. Some permissions missing. Please check https://googlecloudplatform.github.io/spanner-migration-tool/permissions.html for required pubsub permissions. error=%v", err))
return nil, err
}
pubsubCfg.BucketName = bucketName
pubsubCfg.NotificationId = notificationID
logger.Log.Info(fmt.Sprintf("Successfully created pubsub topic id=%s, subscription id=%s, notification for bucket=%s with id=%s.\n", pubsubCfg.TopicId, pubsubCfg.SubscriptionId, bucketName, notificationID))
return &pubsubCfg, nil
}
func createPubsubTopicAndSubscription(ctx context.Context, pubsubClient *pubsub.Client, dbName string, pubsubDestination string) (internal.PubsubResources, error) {
pubsubCfg := internal.PubsubResources{}
// Generate ID
subscriptionId, err := utils.GenerateName("smt-sub-" + dbName + pubsubDestination)
if err != nil {
return pubsubCfg, fmt.Errorf("error generating pubsub subscription ID: %v", err)
}
pubsubCfg.SubscriptionId = subscriptionId
topicId, err := utils.GenerateName("smt-topic-" + dbName + pubsubDestination)
if err != nil {
return pubsubCfg, fmt.Errorf("error generating pubsub topic ID: %v", err)
}
pubsubCfg.TopicId = topicId
// Create Topic and Subscription
// CreateTopic has out of box retires
// Ref - https://github.com/googleapis/googleapis/blob/master/google/pubsub/v1/pubsub_grpc_service_config.json
topicObj, err := pubsubClient.CreateTopic(ctx, pubsubCfg.TopicId)
if err != nil {
return pubsubCfg, fmt.Errorf("pubsub topic could not be created: %v", err)
}
// CreateSubscription has out of box retires
// Ref - https://github.com/googleapis/googleapis/blob/master/google/pubsub/v1/pubsub_grpc_service_config.json
_, err = pubsubClient.CreateSubscription(ctx, pubsubCfg.SubscriptionId, pubsub.SubscriptionConfig{
Topic: topicObj,
AckDeadline: time.Minute * 10,
RetentionDuration: time.Hour * 24 * 7,
})
if err != nil {
return pubsubCfg, fmt.Errorf("pubsub subscription could not be created: %v", err)
}
return pubsubCfg, nil
}
// FetchTargetBucketAndPath fetches the bucket and path name from a Datastream destination config.
func FetchTargetBucketAndPath(ctx context.Context, datastreamClient *datastream.Client, projectID string, datastreamDestinationConnCfg DstConnCfg, pubsubDestination string) (string, string, error) {
if datastreamClient == nil {
return "", "", fmt.Errorf("datastream client could not be created")
}
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", projectID, datastreamDestinationConnCfg.Location, datastreamDestinationConnCfg.Name)
// `GetConnectionProfile` has out of box retries. Ref - https://github.com/googleapis/googleapis/blob/master/google/cloud/datastream/v1/datastream_grpc_service_config.json
res, err := datastreamClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return "", "", fmt.Errorf("could not get connection profiles: %v", err)
}
// Fetch the GCS path from the target connection profile.
// The Get calls for Google Cloud Storage API have out of box retries.
// Reference - https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations
gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
bucketName := gcsProfile.Bucket
prefix := gcsProfile.RootPath + datastreamDestinationConnCfg.Prefix
// For DLQ gcs folder, notification is created only on retry
if pubsubDestination == constants.DLQ_GCS {
pubsubDestination += "retry"
}
prefix = utils.ConcatDirectoryPath(prefix, pubsubDestination+"/")
return bucketName, prefix, nil
}
func createNotificationOnBucket(ctx context.Context, storageClient *storage.Client, projectID, topicID, bucketName, prefix string) (string, error) {
notification := storage.Notification{
TopicID: topicID,
TopicProjectID: projectID,
PayloadFormat: storage.JSONPayload,
ObjectNamePrefix: prefix,
}
// TODO: Explore if there's a way to make this idempotent or retriable.
// The classification for this call is never idempotent
// Ref - https://cloud.google.com/storage/docs/retry-strategy
createdNotification, err := storageClient.Bucket(bucketName).AddNotification(ctx, ¬ification)
if err != nil {
return "", fmt.Errorf("GCS Notification could not be created: %v", err)
}
return createdNotification.ID, nil
}
// LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream.
func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, migrationProjectId string, datastreamCfg DatastreamCfg) error {
projectNumberResource := GetProjectNumberResource(ctx, fmt.Sprintf("projects/%s", migrationProjectId))
fmt.Println("Launching stream ", fmt.Sprintf("%s/locations/%s", projectNumberResource, datastreamCfg.StreamLocation))
dsClient, err := datastream.NewClient(ctx)
if err != nil {
return fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()
// Rate limit this function to match DataStream API Quota.
DATA_STREAM_RL.Take()
fmt.Println("Created client...")
prefix := datastreamCfg.DestinationConnectionConfig.Prefix
prefix = utils.ConcatDirectoryPath(prefix, "data")
gcsDstCfg := &datastreampb.GcsDestinationConfig{
Path: prefix,
FileFormat: &datastreampb.GcsDestinationConfig_AvroFileFormat{},
}
srcCfg := &datastreampb.SourceConfig{
SourceConnectionProfile: fmt.Sprintf("%s/locations/%s/connectionProfiles/%s", projectNumberResource, datastreamCfg.SourceConnectionConfig.Location, datastreamCfg.SourceConnectionConfig.Name),
}
err = getSourceStreamConfig(srcCfg, sourceProfile, dbList, datastreamCfg)
if err != nil {
return fmt.Errorf("could not get source stream config: %v", err)
}
dstCfg := &datastreampb.DestinationConfig{
DestinationConnectionProfile: fmt.Sprintf("%s/locations/%s/connectionProfiles/%s", projectNumberResource, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name),
DestinationStreamConfig: &datastreampb.DestinationConfig_GcsDestinationConfig{GcsDestinationConfig: gcsDstCfg},
}
streamInfo := &datastreampb.Stream{
DisplayName: datastreamCfg.StreamDisplayName,
SourceConfig: srcCfg,
DestinationConfig: dstCfg,
State: datastreampb.Stream_RUNNING,
BackfillStrategy: &datastreampb.Stream_BackfillAll{BackfillAll: &datastreampb.Stream_BackfillAllStrategy{}},
}
createStreamRequest := &datastreampb.CreateStreamRequest{
Parent: fmt.Sprintf("%s/locations/%s", projectNumberResource, datastreamCfg.StreamLocation),
StreamId: datastreamCfg.StreamId,
Stream: streamInfo,
// Setting a RequestId makes idempotent retries possible.
RequestId: uuid.New().String(),
}
fmt.Println("Created stream request..")
dsOp, err := dsClient.CreateStream(ctx, createStreamRequest, gax.WithRetry(dataStreamGaxRetrier))
if err != nil {
fmt.Printf("cannot create stream: createStreamRequest: %+v\n", createStreamRequest)
return fmt.Errorf("cannot create stream: %v ", err)
}
_, err = dsOp.Wait(ctx)
if err != nil {
fmt.Printf("datastream create operation failed: createStreamRequest: %+v\n", createStreamRequest)
return fmt.Errorf("datastream create operation failed: %v", err)
}
fmt.Println("Successfully created stream ", datastreamCfg.StreamId)
/* Note: Retrying across an LRO poll is a workaround and not a fix, use it only after checking with the API server team.
* In most cases, if a long running operation leads into a retriable failure, the server would retry internally before marking the operation as failed.
*/
updateErr := backoff.Retry(func() error { return updateStream(ctx, streamInfo, projectNumberResource, datastreamCfg, dsClient) },
getUpdateDataStreamLRORetryBackoff())
if updateErr != nil {
return updateErr
}
fmt.Println("Done")
return nil
}
func updateStream(ctx context.Context, streamInfo *datastreampb.Stream, projectNumberResource string, datastreamCfg DatastreamCfg, dsClient *datastream.Client) error {
fmt.Print("Setting stream state to RUNNING...")
streamInfo.Name = fmt.Sprintf("%s/locations/%s/streams/%s", projectNumberResource, datastreamCfg.StreamLocation, datastreamCfg.StreamId)
// Setting a RequestId makes idempotent retries possible.
updateStreamRequest := &datastreampb.UpdateStreamRequest{
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"state"}},
Stream: streamInfo,
RequestId: uuid.New().String(),
}
upOp, err := dsClient.UpdateStream(ctx, updateStreamRequest, gax.WithRetry(dataStreamGaxRetrier))
if err != nil {
fmt.Printf("Encountered Error in updating datastream. Error before LRO poll: %v\n", err)
return fmt.Errorf("could not create update request: %v", err)
}
_, err = upOp.Wait(ctx)
if err != nil {
fmt.Printf("Encountered Error in updating datastream. Error after LRO poll: %v\n", err)
return fmt.Errorf("update stream operation failed: %v", err)
}
return nil
}
// LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job.
func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
spannerProjectId, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{})
dataflowCfg := streamingCfg.DataflowCfg
datastreamCfg := streamingCfg.DatastreamCfg
// Rate limit this function to match DataFlow createJob Quota.
DATA_FLOW_RL.Take()
fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", migrationProjectId, "-", dataflowCfg.Location)
c, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not create flex template client: %v", err)
}
defer c.Close()
fmt.Println("Created flex template client...")
//Creating datastream client to fetch the gcs bucket using target profile.
dsClient, err := datastream.NewClient(ctx)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()
// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", migrationProjectId, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err)
}
gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
inputFilePattern := "gs://" + gcsProfile.Bucket + gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix
if inputFilePattern[len(inputFilePattern)-1] != '/' {
inputFilePattern = inputFilePattern + "/"
}
fmt.Println("Reading files from datastream destination ", inputFilePattern)
// Initiate runtime environment flags and overrides.
var (
dataflowProjectId = migrationProjectId
dataflowVpcHostProjectId = migrationProjectId
gcsTemplatePath = utils.GetDataflowTemplatePath()
dataflowSubnetwork = ""
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
dataflowUserLabels = make(map[string]string)
machineType = "n1-standard-2"
)
// If project override present, use that otherwise default to Migration project. Useful when customers want to run Dataflow in separate project.
if dataflowCfg.ProjectId != "" {
dataflowProjectId = dataflowCfg.ProjectId
}
// If VPC Host project override present, use that otherwise default to Migration project.
if dataflowCfg.VpcHostProjectId != "" {
dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId
}
if dataflowCfg.GcsTemplatePath != "" {
gcsTemplatePath = dataflowCfg.GcsTemplatePath
}
// If either network or subnetwork is specified, set IpConfig to private.
if dataflowCfg.Network != "" || dataflowCfg.Subnetwork != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
if dataflowCfg.Subnetwork != "" {
dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowVpcHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
}
}
if dataflowCfg.AdditionalUserLabels != "" {
err = json.Unmarshal([]byte(dataflowCfg.AdditionalUserLabels), &dataflowUserLabels)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not unmarshal AdditionalUserLabels json %s : error = %v", dataflowCfg.AdditionalUserLabels, err)
}
}
if dataflowCfg.MaxWorkers != "" {
intVal, err := strconv.ParseInt(dataflowCfg.MaxWorkers, 10, 64)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not parse MaxWorkers parameter %s, please provide a positive integer as input", dataflowCfg.MaxWorkers)
}
maxWorkers = int32(intVal)
if maxWorkers < MIN_WORKER_LIMIT || maxWorkers > MAX_WORKER_LIMIT {
return internal.DataflowOutput{}, fmt.Errorf("maxWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
}
}
if dataflowCfg.NumWorkers != "" {
intVal, err := strconv.ParseInt(dataflowCfg.NumWorkers, 10, 64)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not parse NumWorkers parameter %s, please provide a positive integer as input", dataflowCfg.NumWorkers)
}
numWorkers = int32(intVal)
if numWorkers < MIN_WORKER_LIMIT || numWorkers > MAX_WORKER_LIMIT {
return internal.DataflowOutput{}, fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT)
}
}
if dataflowCfg.MachineType != "" {
machineType = dataflowCfg.MachineType
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
Parameters: map[string]string{
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"projectId": spannerProjectId,
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
"gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.PubsubCfg.SubscriptionId),
"dlqGcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.DlqPubsubCfg.SubscriptionId),
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: maxWorkers,
NumWorkers: numWorkers,
ServiceAccountEmail: dataflowCfg.ServiceAccountEmail,
AutoscalingAlgorithm: 2, // 2 corresponds to AUTOSCALING_ALGORITHM_BASIC
EnableStreamingEngine: true,
Network: dataflowCfg.Network,
Subnetwork: dataflowSubnetwork,
IpConfiguration: workerIpAddressConfig,
MachineType: machineType,
AdditionalUserLabels: dataflowUserLabels,
KmsKeyName: dataflowCfg.KmsKeyName,
},
}
if dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath != "" {
launchParameters.Parameters["transformationJarPath"] = dataflowCfg.CustomJarPath
launchParameters.Parameters["transformationClassName"] = dataflowCfg.CustomClassName
launchParameters.Parameters["transformationCustomParameters"] = dataflowCfg.CustomParameter
launchParameters.Parameters["filteredEventsDirectory"] = utils.ConcatDirectoryPath(inputFilePattern, "filteredEvents")
} else if (dataflowCfg.CustomClassName != "" && dataflowCfg.CustomJarPath == "") || (dataflowCfg.CustomClassName == "" && dataflowCfg.CustomJarPath != "") {
return internal.DataflowOutput{}, fmt.Errorf("specify both the custom class name and custom jar GCS path, or specify neither")
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: dataflowProjectId,
LaunchParameter: launchParameters,
Location: dataflowCfg.Location,
}
fmt.Println("Created flex template request body...")
// LaunchFlexTemplate does not have out of box retries or any direct documentation on how
// to make the call idempotent.
// Ref - https://github.com/googleapis/googleapis/blob/master/google/dataflow/v1beta3/dataflow_grpc_service_config.json
// TODO explore retries.
respDf, err := c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("flexTemplateRequest: %+v\n", req)
return internal.DataflowOutput{}, fmt.Errorf("unable to launch template: %v", err)
}
// Refactor to use accessor return value.
gcloudDfCmd := dataflowaccessor.GetGcloudDataflowCommandFromRequest(req)
logger.Log.Debug(fmt.Sprintf("\nEquivalent gCloud command for job %s:\n%s\n\n", req.LaunchParameter.JobName, gcloudDfCmd))
return internal.DataflowOutput{JobID: respDf.Job.Id, GCloudCmd: gcloudDfCmd}, nil
}
func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJobId, gcloudDataflowCmd, migrationProjectId, dataShardId string, gcsBucket internal.GcsResources, dashboardName string) {
datastreamCfg := streamingCfg.DatastreamCfg
dataflowCfg := streamingCfg.DataflowCfg
conv.Audit.StreamingStats.DatastreamResources = internal.DatastreamResources{DatastreamName: datastreamCfg.StreamId, Region: datastreamCfg.StreamLocation}
conv.Audit.StreamingStats.DataflowResources = internal.DataflowResources{JobId: dfJobId, GcloudCmd: gcloudDataflowCmd, Region: dataflowCfg.Location}
conv.Audit.StreamingStats.PubsubResources = streamingCfg.PubsubCfg
conv.Audit.StreamingStats.DlqPubsubResources = streamingCfg.DlqPubsubCfg
conv.Audit.StreamingStats.GcsResources = gcsBucket
conv.Audit.StreamingStats.MonitoringResources = internal.MonitoringResources{DashboardName: dashboardName}
if dataShardId != "" {
var resourceMutex sync.Mutex
resourceMutex.Lock()
var shardResources internal.ShardResources
shardResources.DatastreamResources = internal.DatastreamResources{DatastreamName: datastreamCfg.StreamId, Region: datastreamCfg.StreamLocation}
shardResources.DataflowResources = internal.DataflowResources{JobId: dfJobId, GcloudCmd: gcloudDataflowCmd, Region: dataflowCfg.Location}
shardResources.PubsubResources = streamingCfg.PubsubCfg
shardResources.DlqPubsubResources = streamingCfg.DlqPubsubCfg
shardResources.GcsResources = gcsBucket
if dashboardName != "" {
{
shardResources.MonitoringResources = internal.MonitoringResources{DashboardName: dashboardName}
}
}
conv.Audit.StreamingStats.ShardToShardResourcesMap[dataShardId] = shardResources
resourceMutex.Unlock()
}
fullStreamName := fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId)
dfJobDetails := fmt.Sprintf("project: %s, location: %s, name: %s, id: %s", migrationProjectId, dataflowCfg.Location, dataflowCfg.JobName, dfJobId)
logger.Log.Info("\n------------------------------------------\n")
logger.Log.Info("The Datastream stream: " + fullStreamName + " ,the Dataflow job: " + dfJobDetails +
" the Pubsub topic: " + streamingCfg.PubsubCfg.TopicId + " and " + streamingCfg.DlqPubsubCfg.TopicId + " ,the subscription: " +
streamingCfg.PubsubCfg.SubscriptionId + " and " + streamingCfg.DlqPubsubCfg.SubscriptionId + " and the pubsub Notification id:" +
streamingCfg.PubsubCfg.NotificationId + " and " + streamingCfg.DlqPubsubCfg.NotificationId + " on bucket: " + streamingCfg.PubsubCfg.BucketName +
" can be cleaned up by using the UI if you have it open. Otherwise, you can use the cleanup CLI command and supply the migrationjobId" +
" generated by Spanner migration tool. You can find the migrationJobId in the logs or in the 'spannermigrationtool_metadata'" +
" database in your spanner instance. If migrationJobId is not stored due to an error, you will have to clean up the resources manually.")
}
func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg {
//create dataflowcfg from pl receiver object
inputDataflowConfig := pl.DataflowConfig
dataflowCfg := DataflowCfg{
ProjectId: inputDataflowConfig.ProjectId,
Location: inputDataflowConfig.Location,
Network: inputDataflowConfig.Network,
VpcHostProjectId: inputDataflowConfig.VpcHostProjectId,
Subnetwork: inputDataflowConfig.Subnetwork,
MaxWorkers: inputDataflowConfig.MaxWorkers,
NumWorkers: inputDataflowConfig.NumWorkers,
ServiceAccountEmail: inputDataflowConfig.ServiceAccountEmail,
MachineType: inputDataflowConfig.MachineType,
AdditionalUserLabels: inputDataflowConfig.AdditionalUserLabels,
KmsKeyName: inputDataflowConfig.KmsKeyName,
GcsTemplatePath: inputDataflowConfig.GcsTemplatePath,
CustomJarPath: inputDataflowConfig.CustomJarPath,
CustomClassName: inputDataflowConfig.CustomClassName,
CustomParameter: inputDataflowConfig.CustomParameter,
}
//create src and dst datastream from pl receiver object
datastreamCfg := DatastreamCfg{
StreamLocation: pl.StreamLocation,
MaxConcurrentBackfillTasks: pl.DatastreamConfig.MaxConcurrentBackfillTasks,
MaxConcurrentCdcTasks: pl.DatastreamConfig.MaxConcurrentCdcTasks,
}
//set src connection profile
inputSrcConnProfile := pl.SrcConnectionProfile
srcConnCfg := SrcConnCfg{Location: inputSrcConnProfile.Location, Name: inputSrcConnProfile.Name}
datastreamCfg.SourceConnectionConfig = srcConnCfg
//set dst connection profile
inputDstConnProfile := pl.DstConnectionProfile
dstConnCfg := DstConnCfg{Name: inputDstConnProfile.Name, Location: inputDstConnProfile.Location}
datastreamCfg.DestinationConnectionConfig = dstConnCfg
gcsCfg := GcsCfg{
TtlInDays: pl.GcsConfig.TtlInDays,
TtlInDaysSet: pl.GcsConfig.TtlInDaysSet,
}
//create the streamingCfg object
streamingCfg := StreamingCfg{
DatastreamCfg: datastreamCfg,
GcsCfg: gcsCfg,
DataflowCfg: dataflowCfg,
TmpDir: pl.TmpDir,
DataShardId: pl.DataShardId}
return streamingCfg
}
// Maps Project-Id to ProjectNumber.
var ProjectNumberResourceCache sync.Map
// Returns a string that encodes the project number like `projects/12345`
func GetProjectNumberResource(ctx context.Context, projectID string) string {
projectNumberResource, found := ProjectNumberResourceCache.Load(projectID)
if found {
return projectNumberResource.(string)
}
rmClient, err := resourcemanager.NewProjectsClient(ctx)
if err != nil {
logger.Log.Warn(fmt.Sprintf("Could not create resourcemanager client to query project number. Defaulting to ProjectId=%s. error=%v",
projectID, err))
return projectID
}
defer rmClient.Close()
// `GetProjectRequest` has out of box retries.
// Ref - https://github.com/googleapis/googleapis/blob/master/google/cloud/resourcemanager/v3/cloudresourcemanager_v3_grpc_service_config.json
req := resourcemanagerpb.GetProjectRequest{Name: projectID}
project, err := rmClient.GetProject(ctx, &req)
if err != nil {
logger.Log.Warn(fmt.Sprintf("Could not query resourcemanager to get project number. Defaulting to ProjectId=%s. error=%v",
projectID, err))
return projectID
}
projectNumberResource = project.GetName()
ProjectNumberResourceCache.Store(projectID, projectNumberResource)
return projectNumberResource.(string)
}
func StartDatastream(ctx context.Context, migrationProjectId string, streamingCfg StreamingCfg, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, schemaDetails map[string]internal.SchemaDetails) (StreamingCfg, error) {
driver := sourceProfile.Driver
var dbList []profiles.LogicalShard
switch driver {
case constants.MYSQL:
dbList = append(dbList, profiles.LogicalShard{DbName: sourceProfile.Conn.Mysql.Db})
case constants.ORACLE:
dbList = append(dbList, profiles.LogicalShard{DbName: sourceProfile.Conn.Oracle.User})
case constants.POSTGRES:
dbList = append(dbList, profiles.LogicalShard{DbName: streamingCfg.DatastreamCfg.Properties})
}
err := LaunchStream(ctx, sourceProfile, dbList, migrationProjectId, streamingCfg.DatastreamCfg)
if err != nil {
return streamingCfg, fmt.Errorf("error launching stream: %v", err)
}
return streamingCfg, nil
}
func StartDataflow(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return internal.DataflowOutput{}, err
}
sa := storageaccessor.StorageAccessorImpl{}
convJSON, err := json.MarshalIndent(conv, "", " ")
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("can't encode session state to JSON: %v", err)
}
err = sa.WriteDataToGCS(ctx, sc, streamingCfg.TmpDir, "session.json", string(convJSON))
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error while writing to GCS: %v", err)
}
transformationContextMap := map[string]interface{}{
"SchemaToShardId": streamingCfg.DataflowCfg.DbNameToShardIdMap,
}
transformationContext, err := json.Marshal(transformationContextMap)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("failed to compute transformation context: %s", err.Error())
}
err = sa.WriteDataToGCS(ctx, sc, streamingCfg.TmpDir, "transformationContext.json", string(transformationContext))
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error while writing to GCS: %v", err)
}
dfOutput, err := LaunchDataflowJob(ctx, migrationProjectId, targetProfile, streamingCfg, conv)
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error launching dataflow: %v", err)
}
return dfOutput, nil
}