conversion/data_from_database.go (232 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package conversion
import (
"context"
"fmt"
"strings"
"sync"
"cloud.google.com/go/datastream/apiv1/datastreampb"
sp "cloud.google.com/go/spanner"
datastreamclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/datastream"
storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage"
datastream_accessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/datastream"
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/task"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer"
"github.com/GoogleCloudPlatform/spanner-migration-tool/streaming"
"go.uber.org/zap"
)
type DataFromDatabaseInterface interface {
dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error)
dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error)
dataFromDatabaseForBulkMigration(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error)
}
type DataFromDatabaseImpl struct{}
// TODO: Define the data processing logic for DMS migrations here.
func (dd *DataFromDatabaseImpl) dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) {
return nil, fmt.Errorf("dms configType is not implemented yet, please use one of 'bulk' or 'dataflow'")
}
// 1. Create batch for each physical shard
// 2. Create streaming cfg from the config source type.
// 3. Verify the CFG and update it with SMT defaults
// 4. Launch the stream for the physical shard
// 5. Perform streaming migration via dataflow
func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) {
// Fetch Spanner Region
if conv.SpRegion == "" {
spAcc, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
}
spannerRegion, err := spAcc.GetSpannerLeaderLocation(ctx, "projects/"+targetProfile.Conn.Sp.Project+"/instances/"+targetProfile.Conn.Sp.Instance)
if err != nil {
return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
}
conv.SpRegion = spannerRegion
}
storageClient, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return nil, err
}
// Create Resources required for migration
if conv.ResourceValidation {
dsClient, err := datastreamclient.NewDatastreamClientImpl(ctx)
if err != nil {
return nil, err
}
createResources := NewValidateOrCreateResourcesImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageClient)
err = createResources.ValidateOrCreateResourcesForShardedMigration(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, false, conv.SpRegion, sourceProfile)
if err != nil {
return nil, fmt.Errorf("unable to create connection profiles: %v", err)
}
}
//Set the TmpDir from the sessionState bucket which is derived from the target connection profile
for _, dataShard := range sourceProfile.Config.ShardConfigurationDataflow.DataShards {
if dataShard.TmpDir == "" {
bucket, rootPath, err := GetBucketFromDatastreamProfile(migrationProjectId, conv.SpRegion, dataShard.DstConnectionProfile.Name)
if err != nil {
return nil, fmt.Errorf("error while getting target bucket: %v", err)
}
dataShard.TmpDir = "gs://" + bucket + rootPath
}
}
updateShardsWithTuningConfigs(sourceProfile.Config.ShardConfigurationDataflow)
//Generate a job Id
migrationJobId := conv.Audit.MigrationRequestId
fmt.Printf("Creating a migration job with id: %v. This jobId can be used in future commmands (such as cleanup) to refer to this job.\n", migrationJobId)
conv.Audit.StreamingStats.ShardToShardResourcesMap = make(map[string]internal.ShardResources)
schemaDetails, err := is.GetIncludedSrcTablesFromConv(conv)
if err != nil {
fmt.Printf("unable to determine tableList from schema, falling back to full database")
schemaDetails = map[string]internal.SchemaDetails{}
}
err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, true)
if err != nil {
logger.Log.Info(fmt.Sprintf("Error storing job details in SMT metadata store...the migration job will still continue as intended. %v", err))
}
asyncProcessShards := func(p *profiles.DataShard, mutex *sync.Mutex) task.TaskResult[*profiles.DataShard] {
dbNameToShardIdMap := make(map[string]string)
for _, l := range p.LogicalShards {
dbNameToShardIdMap[l.DbName] = l.LogicalShardId
}
if p.DataShardId == "" {
dataShardId, err := utils.GenerateName("smt-datashard")
dataShardId = strings.Replace(dataShardId, "_", "-", -1)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
p.DataShardId = dataShardId
fmt.Printf("Data shard id generated: %v\n", p.DataShardId)
}
streamingCfg := streaming.CreateStreamingConfig(*p)
err := streaming.VerifyAndUpdateCfg(&streamingCfg, targetProfile.Conn.Sp.Dbname, schemaDetails)
if err != nil {
err = fmt.Errorf("failed to process shard: %s, there seems to be an error in the sharding configuration, error: %v", p.DataShardId, err)
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.DLQ_GCS)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, migrationProjectId, streamingCfg.DatastreamCfg)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, targetProfile, streamingCfg, conv)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
// store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values)
// Fetch and store the GCS bucket associated with the datastream
dsClient := GetDatastreamClient(ctx)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
if fetchGcsErr != nil {
logger.Log.Info(fmt.Sprintf("Could not fetch GCS Bucket for Shard %s hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n", p.DataShardId))
logger.Log.Debug("Error", zap.Error(fetchGcsErr))
}
// Try to apply lifecycle rule to Datastream destination bucket.
gcsConfig := streamingCfg.GcsCfg
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
sa := storageaccessor.StorageAccessorImpl{}
if gcsConfig.TtlInDaysSet {
err = sa.ApplyBucketLifecycleDeleteRule(ctx, sc, storageaccessor.StorageBucketMetadata{
BucketName: gcsBucket,
Ttl: gcsConfig.TtlInDays,
MatchesPrefix: []string{gcsDestPrefix},
})
if err != nil {
logger.Log.Warn(fmt.Sprintf("\nWARNING: could not update Datastream destination GCS bucket with lifecycle rule, error: %v\n", err))
logger.Log.Warn("Please apply the lifecycle rule manually. Continuing...\n")
}
}
// create monitoring dashboard for a single shard
monitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
DataflowJobId: dfOutput.JobID,
DatastreamId: streamingCfg.DatastreamCfg.StreamId,
JobMetadataGcsBucket: gcsBucket,
PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
ShardId: p.DataShardId,
MigrationRequestId: conv.Audit.MigrationRequestId,
}
respDash, dashboardErr := monitoringResources.CreateDataflowShardMonitoringDashboard(ctx)
var dashboardName string
if dashboardErr != nil {
dashboardName = ""
logger.Log.Info(fmt.Sprintf("Creation of the monitoring dashboard for shard %s failed, please create the dashboard manually\n", p.DataShardId))
logger.Log.Debug("Error", zap.Error(dashboardErr))
} else {
dashboardName = strings.Split(respDash.Name, "/")[3]
fmt.Printf("Monitoring Dashboard for shard %v: %+v\n", p.DataShardId, dashboardName)
}
streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, migrationProjectId, p.DataShardId, internal.GcsResources{BucketName: gcsBucket}, dashboardName)
//persist the generated resources in a metadata db
err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, p.DataShardId)
if err != nil {
fmt.Printf("Error storing generated resources in SMT metadata store for dataShardId: %s...the migration job will still continue as intended, error: %v\n", p.DataShardId, err)
}
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
r := task.RunParallelTasksImpl[*profiles.DataShard, *profiles.DataShard]{}
_, err = r.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, false)
if err != nil {
return nil, fmt.Errorf("unable to start minimal downtime migrations: %v", err)
}
// create monitoring aggregated dashboard for sharded migration
aggMonitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
ShardToShardResourcesMap: conv.Audit.StreamingStats.ShardToShardResourcesMap,
MigrationRequestId: conv.Audit.MigrationRequestId,
}
aggRespDash, dashboardErr := aggMonitoringResources.CreateDataflowAggMonitoringDashboard(ctx)
if dashboardErr != nil {
logger.Log.Error(fmt.Sprintf("Creation of the aggregated monitoring dashboard failed, please create the dashboard manually\n error=%v\n", dashboardErr))
} else {
fmt.Printf("Aggregated Monitoring Dashboard: %+v\n", strings.Split(aggRespDash.Name, "/")[3])
conv.Audit.StreamingStats.AggMonitoringResources = internal.MonitoringResources{DashboardName: strings.Split(aggRespDash.Name, "/")[3]}
}
err = streaming.PersistAggregateMonitoringResources(ctx, targetProfile, sourceProfile, conv, migrationJobId)
if err != nil {
logger.Log.Info(fmt.Sprintf("Unable to store aggregated monitoring dashboard in metadata database\n error=%v\n", err))
} else {
logger.Log.Debug("Aggregate monitoring resources stored successfully.\n")
}
return &writer.BatchWriter{}, nil
}
// 1. Migrate the data from the data shards, the schema shard needs to be specified here again.
// 2. Create a connection profile object for it
// 3. Perform a snapshot migration for the shard
// 4. Once all shard migrations are complete, return the batch writer object
func (dd *DataFromDatabaseImpl) dataFromDatabaseForBulkMigration(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error) {
var bw *writer.BatchWriter
for _, dataShard := range sourceProfile.Config.ShardConfigurationBulk.DataShards {
fmt.Printf("Initiating migration for shard: %v\n", dataShard.DbName)
infoSchema, err := gi.getInfoSchemaForShard(migrationProjectId, dataShard, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{})
if err != nil {
return nil, err
}
additionalDataAttributes := internal.AdditionalDataAttributes{
ShardId: dataShard.DataShardId,
}
bw = sm.performSnapshotMigration(config, conv, client, infoSchema, additionalDataAttributes, &common.InfoSchemaImpl{}, &PopulateDataConvImpl{})
}
return bw, nil
}
func GetBucketFromDatastreamProfile(project, location, profileName string) (string, string, error) {
ctx := context.Background()
dsClient, err := datastreamclient.NewDatastreamClientImpl(ctx)
if err != nil {
return "", "", fmt.Errorf("datastream client can not be created: %v", err)
}
// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", project, location, profileName)
res, err := dsClient.GetConnectionProfile(ctx, dstProf)
if err != nil {
return "", "", fmt.Errorf("could not get connection profile: %v", err)
}
gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
return gcsProfile.GetBucket(), gcsProfile.GetRootPath(), nil
}