in conversion/data_from_database.go [61:247]
func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) {
// Fetch Spanner Region
if conv.SpRegion == "" {
spAcc, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
}
spannerRegion, err := spAcc.GetSpannerLeaderLocation(ctx, "projects/"+targetProfile.Conn.Sp.Project+"/instances/"+targetProfile.Conn.Sp.Instance)
if err != nil {
return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
}
conv.SpRegion = spannerRegion
}
storageClient, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return nil, err
}
// Create Resources required for migration
if conv.ResourceValidation {
dsClient, err := datastreamclient.NewDatastreamClientImpl(ctx)
if err != nil {
return nil, err
}
createResources := NewValidateOrCreateResourcesImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageClient)
err = createResources.ValidateOrCreateResourcesForShardedMigration(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, false, conv.SpRegion, sourceProfile)
if err != nil {
return nil, fmt.Errorf("unable to create connection profiles: %v", err)
}
}
//Set the TmpDir from the sessionState bucket which is derived from the target connection profile
for _, dataShard := range sourceProfile.Config.ShardConfigurationDataflow.DataShards {
if dataShard.TmpDir == "" {
bucket, rootPath, err := GetBucketFromDatastreamProfile(migrationProjectId, conv.SpRegion, dataShard.DstConnectionProfile.Name)
if err != nil {
return nil, fmt.Errorf("error while getting target bucket: %v", err)
}
dataShard.TmpDir = "gs://" + bucket + rootPath
}
}
updateShardsWithTuningConfigs(sourceProfile.Config.ShardConfigurationDataflow)
//Generate a job Id
migrationJobId := conv.Audit.MigrationRequestId
fmt.Printf("Creating a migration job with id: %v. This jobId can be used in future commmands (such as cleanup) to refer to this job.\n", migrationJobId)
conv.Audit.StreamingStats.ShardToShardResourcesMap = make(map[string]internal.ShardResources)
schemaDetails, err := is.GetIncludedSrcTablesFromConv(conv)
if err != nil {
fmt.Printf("unable to determine tableList from schema, falling back to full database")
schemaDetails = map[string]internal.SchemaDetails{}
}
err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, true)
if err != nil {
logger.Log.Info(fmt.Sprintf("Error storing job details in SMT metadata store...the migration job will still continue as intended. %v", err))
}
asyncProcessShards := func(p *profiles.DataShard, mutex *sync.Mutex) task.TaskResult[*profiles.DataShard] {
dbNameToShardIdMap := make(map[string]string)
for _, l := range p.LogicalShards {
dbNameToShardIdMap[l.DbName] = l.LogicalShardId
}
if p.DataShardId == "" {
dataShardId, err := utils.GenerateName("smt-datashard")
dataShardId = strings.Replace(dataShardId, "_", "-", -1)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
p.DataShardId = dataShardId
fmt.Printf("Data shard id generated: %v\n", p.DataShardId)
}
streamingCfg := streaming.CreateStreamingConfig(*p)
err := streaming.VerifyAndUpdateCfg(&streamingCfg, targetProfile.Conn.Sp.Dbname, schemaDetails)
if err != nil {
err = fmt.Errorf("failed to process shard: %s, there seems to be an error in the sharding configuration, error: %v", p.DataShardId, err)
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.DLQ_GCS)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, migrationProjectId, streamingCfg.DatastreamCfg)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, targetProfile, streamingCfg, conv)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
// store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values)
// Fetch and store the GCS bucket associated with the datastream
dsClient := GetDatastreamClient(ctx)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
if fetchGcsErr != nil {
logger.Log.Info(fmt.Sprintf("Could not fetch GCS Bucket for Shard %s hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n", p.DataShardId))
logger.Log.Debug("Error", zap.Error(fetchGcsErr))
}
// Try to apply lifecycle rule to Datastream destination bucket.
gcsConfig := streamingCfg.GcsCfg
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
sa := storageaccessor.StorageAccessorImpl{}
if gcsConfig.TtlInDaysSet {
err = sa.ApplyBucketLifecycleDeleteRule(ctx, sc, storageaccessor.StorageBucketMetadata{
BucketName: gcsBucket,
Ttl: gcsConfig.TtlInDays,
MatchesPrefix: []string{gcsDestPrefix},
})
if err != nil {
logger.Log.Warn(fmt.Sprintf("\nWARNING: could not update Datastream destination GCS bucket with lifecycle rule, error: %v\n", err))
logger.Log.Warn("Please apply the lifecycle rule manually. Continuing...\n")
}
}
// create monitoring dashboard for a single shard
monitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
DataflowJobId: dfOutput.JobID,
DatastreamId: streamingCfg.DatastreamCfg.StreamId,
JobMetadataGcsBucket: gcsBucket,
PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
ShardId: p.DataShardId,
MigrationRequestId: conv.Audit.MigrationRequestId,
}
respDash, dashboardErr := monitoringResources.CreateDataflowShardMonitoringDashboard(ctx)
var dashboardName string
if dashboardErr != nil {
dashboardName = ""
logger.Log.Info(fmt.Sprintf("Creation of the monitoring dashboard for shard %s failed, please create the dashboard manually\n", p.DataShardId))
logger.Log.Debug("Error", zap.Error(dashboardErr))
} else {
dashboardName = strings.Split(respDash.Name, "/")[3]
fmt.Printf("Monitoring Dashboard for shard %v: %+v\n", p.DataShardId, dashboardName)
}
streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, migrationProjectId, p.DataShardId, internal.GcsResources{BucketName: gcsBucket}, dashboardName)
//persist the generated resources in a metadata db
err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, p.DataShardId)
if err != nil {
fmt.Printf("Error storing generated resources in SMT metadata store for dataShardId: %s...the migration job will still continue as intended, error: %v\n", p.DataShardId, err)
}
return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
r := task.RunParallelTasksImpl[*profiles.DataShard, *profiles.DataShard]{}
_, err = r.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, false)
if err != nil {
return nil, fmt.Errorf("unable to start minimal downtime migrations: %v", err)
}
// create monitoring aggregated dashboard for sharded migration
aggMonitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
ShardToShardResourcesMap: conv.Audit.StreamingStats.ShardToShardResourcesMap,
MigrationRequestId: conv.Audit.MigrationRequestId,
}
aggRespDash, dashboardErr := aggMonitoringResources.CreateDataflowAggMonitoringDashboard(ctx)
if dashboardErr != nil {
logger.Log.Error(fmt.Sprintf("Creation of the aggregated monitoring dashboard failed, please create the dashboard manually\n error=%v\n", dashboardErr))
} else {
fmt.Printf("Aggregated Monitoring Dashboard: %+v\n", strings.Split(aggRespDash.Name, "/")[3])
conv.Audit.StreamingStats.AggMonitoringResources = internal.MonitoringResources{DashboardName: strings.Split(aggRespDash.Name, "/")[3]}
}
err = streaming.PersistAggregateMonitoringResources(ctx, targetProfile, sourceProfile, conv, migrationJobId)
if err != nil {
logger.Log.Info(fmt.Sprintf("Unable to store aggregated monitoring dashboard in metadata database\n error=%v\n", err))
} else {
logger.Log.Debug("Aggregate monitoring resources stored successfully.\n")
}
return &writer.BatchWriter{}, nil
}