in conversion/conversion_from_source.go [228:348]
func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) {
//handle migrating data for sharded migrations differently
//sharded migrations are identified via the config= flag, if that flag is not present
//carry on with the existing code path in the else block
switch sourceProfile.Ty {
case profiles.SourceProfileTypeConfig:
////There are three cases to cover here, bulk migrations and sharded migrations (and later DMS)
//We provide an if-else based handling for each within the sharded code branch
//This will be determined via the configType, which can be "bulk", "dataflow" or "dms"
if sourceProfile.Config.ConfigType == constants.BULK_MIGRATION {
return dataFromDb.dataFromDatabaseForBulkMigration(migrationProjectId, sourceProfile, targetProfile, config, conv, client, getInfo, snapshotMigration)
} else if sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
return dataFromDb.dataFromDatabaseForDataflowMigration(migrationProjectId, targetProfile, ctx, sourceProfile, conv, &common.InfoSchemaImpl{})
} else if sourceProfile.Config.ConfigType == constants.DMS_MIGRATION {
return dataFromDb.dataFromDatabaseForDMSMigration()
} else {
return nil, fmt.Errorf("configType should be one of 'bulk', 'dataflow' or 'dms'")
}
default:
var infoSchema common.InfoSchema
var err error
if sourceProfile.Ty == profiles.SourceProfileTypeCloudSQL {
infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(migrationProjectId, sourceProfile, targetProfile)
if err != nil {
return nil, err
}
} else {
infoSchema, err = getInfo.GetInfoSchema(migrationProjectId, sourceProfile, targetProfile)
if err != nil {
return nil, err
}
}
var streamInfo map[string]interface{}
// minimal downtime migration for a single shard
if sourceProfile.Conn.Streaming {
//Generate a job Id
migrationJobId := conv.Audit.MigrationRequestId
logger.Log.Info(fmt.Sprintf("Creating a migration job with id: %v. This jobId can be used in future commmands (such as cleanup) to refer to this job.\n", migrationJobId))
streamInfo, err = infoSchema.StartChangeDataCapture(ctx, conv)
if err != nil {
return nil, err
}
bw, err := snapshotMigration.snapshotMigrationHandler(sourceProfile, config, conv, client, infoSchema)
if err != nil {
return nil, err
}
dfOutput, err := infoSchema.StartStreamingMigration(ctx, migrationProjectId, client, conv, streamInfo)
if err != nil {
return nil, err
}
dfJobId := dfOutput.JobID
gcloudCmd := dfOutput.GCloudCmd
streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
// Fetch and store the GCS bucket associated with the datastream
dsClient := GetDatastreamClient(ctx)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
if fetchGcsErr != nil {
logger.Log.Info("Could not fetch GCS Bucket, hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n")
logger.Log.Debug("Error", zap.Error(fetchGcsErr))
}
// Try to apply lifecycle rule to Datastream destination bucket.
gcsConfig := streamingCfg.GcsCfg
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return nil, err
}
sa := storageaccessor.StorageAccessorImpl{}
if gcsConfig.TtlInDaysSet {
err = sa.ApplyBucketLifecycleDeleteRule(ctx, sc, storageaccessor.StorageBucketMetadata{
BucketName: gcsBucket,
Ttl: gcsConfig.TtlInDays,
MatchesPrefix: []string{gcsDestPrefix},
})
if err != nil {
logger.Log.Warn(fmt.Sprintf("\nWARNING: could not update Datastream destination GCS bucket with lifecycle rule, error: %v\n", err))
logger.Log.Warn("Please apply the lifecycle rule manually. Continuing...\n")
}
}
monitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
DataflowJobId: dfOutput.JobID,
DatastreamId: streamingCfg.DatastreamCfg.StreamId,
JobMetadataGcsBucket: gcsBucket,
PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
ShardId: "",
MigrationRequestId: conv.Audit.MigrationRequestId,
}
respDash, dashboardErr := monitoringResources.CreateDataflowShardMonitoringDashboard(ctx)
var dashboardName string
if dashboardErr != nil {
dashboardName = ""
logger.Log.Info("Creation of the monitoring dashboard failed, please create the dashboard manually")
logger.Log.Debug("Error", zap.Error(dashboardErr))
} else {
dashboardName = strings.Split(respDash.Name, "/")[3]
fmt.Printf("Monitoring Dashboard: %+v\n", dashboardName)
}
// store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values)
streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, migrationProjectId, "", internal.GcsResources{BucketName: gcsBucket}, dashboardName)
//persist job and shard level data in the metadata db
err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, false)
if err != nil {
logger.Log.Info(fmt.Sprintf("Error storing job details in SMT metadata store...the migration job will still continue as intended. %v", err))
} else {
//only attempt persisting shard level data if the job level data is persisted
err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, constants.DEFAULT_SHARD_ID)
if err != nil {
logger.Log.Info(fmt.Sprintf("Error storing details for migration job: %s, data shard: %s in SMT metadata store...the migration job will still continue as intended. err = %v\n", migrationJobId, constants.DEFAULT_SHARD_ID, err))
}
}
return bw, nil
}
//bulk migration for a single shard
return snapshotMigration.performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}, &common.InfoSchemaImpl{}, &PopulateDataConvImpl{}), nil
}
}