func()

in conversion/conversion_from_source.go [228:348]


func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) {
	//handle migrating data for sharded migrations differently
	//sharded migrations are identified via the config= flag, if that flag is not present
	//carry on with the existing code path in the else block
	switch sourceProfile.Ty {
	case profiles.SourceProfileTypeConfig:
		////There are three cases to cover here, bulk migrations and sharded migrations (and later DMS)
		//We provide an if-else based handling for each within the sharded code branch
		//This will be determined via the configType, which can be "bulk", "dataflow" or "dms"
		if sourceProfile.Config.ConfigType == constants.BULK_MIGRATION {
			return dataFromDb.dataFromDatabaseForBulkMigration(migrationProjectId, sourceProfile, targetProfile, config, conv, client, getInfo, snapshotMigration)
		} else if sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
			return dataFromDb.dataFromDatabaseForDataflowMigration(migrationProjectId, targetProfile, ctx, sourceProfile, conv, &common.InfoSchemaImpl{})
		} else if sourceProfile.Config.ConfigType == constants.DMS_MIGRATION {
			return dataFromDb.dataFromDatabaseForDMSMigration()
		} else {
			return nil, fmt.Errorf("configType should be one of 'bulk', 'dataflow' or 'dms'")
		}
	default:
		var infoSchema common.InfoSchema
		var err error
		if sourceProfile.Ty == profiles.SourceProfileTypeCloudSQL {
			infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(migrationProjectId, sourceProfile, targetProfile)
			if err != nil {
				return nil, err
			}
		} else {
			infoSchema, err = getInfo.GetInfoSchema(migrationProjectId, sourceProfile, targetProfile)
			if err != nil {
				return nil, err
			}
		}
		var streamInfo map[string]interface{}
		// minimal downtime migration for a single shard
		if sourceProfile.Conn.Streaming {
			//Generate a job Id
			migrationJobId := conv.Audit.MigrationRequestId
			logger.Log.Info(fmt.Sprintf("Creating a migration job with id: %v. This jobId can be used in future commmands (such as cleanup) to refer to this job.\n", migrationJobId))
			streamInfo, err = infoSchema.StartChangeDataCapture(ctx, conv)
			if err != nil {
				return nil, err
			}
			bw, err := snapshotMigration.snapshotMigrationHandler(sourceProfile, config, conv, client, infoSchema)
			if err != nil {
				return nil, err
			}
			dfOutput, err := infoSchema.StartStreamingMigration(ctx, migrationProjectId, client, conv, streamInfo)
			if err != nil {
				return nil, err
			}
			dfJobId := dfOutput.JobID
			gcloudCmd := dfOutput.GCloudCmd
			streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
			// Fetch and store the GCS bucket associated with the datastream
			dsClient := GetDatastreamClient(ctx)
			gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
			if fetchGcsErr != nil {
				logger.Log.Info("Could not fetch GCS Bucket, hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n")
				logger.Log.Debug("Error", zap.Error(fetchGcsErr))
			}

			// Try to apply lifecycle rule to Datastream destination bucket.
			gcsConfig := streamingCfg.GcsCfg
			sc, err := storageclient.NewStorageClientImpl(ctx)
			if err != nil {
				return nil, err
			}
			sa := storageaccessor.StorageAccessorImpl{}
			if gcsConfig.TtlInDaysSet {
				err = sa.ApplyBucketLifecycleDeleteRule(ctx, sc, storageaccessor.StorageBucketMetadata{
					BucketName:    gcsBucket,
					Ttl:           gcsConfig.TtlInDays,
					MatchesPrefix: []string{gcsDestPrefix},
				})
				if err != nil {
					logger.Log.Warn(fmt.Sprintf("\nWARNING: could not update Datastream destination GCS bucket with lifecycle rule, error: %v\n", err))
					logger.Log.Warn("Please apply the lifecycle rule manually. Continuing...\n")
				}
			}

			monitoringResources := metrics.MonitoringMetricsResources{
				MigrationProjectId:   migrationProjectId,
				DataflowJobId:        dfOutput.JobID,
				DatastreamId:         streamingCfg.DatastreamCfg.StreamId,
				JobMetadataGcsBucket: gcsBucket,
				PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId,
				SpannerProjectId:     targetProfile.Conn.Sp.Project,
				SpannerInstanceId:    targetProfile.Conn.Sp.Instance,
				SpannerDatabaseId:    targetProfile.Conn.Sp.Dbname,
				ShardId:              "",
				MigrationRequestId:   conv.Audit.MigrationRequestId,
			}
			respDash, dashboardErr := monitoringResources.CreateDataflowShardMonitoringDashboard(ctx)
			var dashboardName string
			if dashboardErr != nil {
				dashboardName = ""
				logger.Log.Info("Creation of the monitoring dashboard failed, please create the dashboard manually")
				logger.Log.Debug("Error", zap.Error(dashboardErr))
			} else {
				dashboardName = strings.Split(respDash.Name, "/")[3]
				fmt.Printf("Monitoring Dashboard: %+v\n", dashboardName)
			}
			// store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values)
			streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, migrationProjectId, "", internal.GcsResources{BucketName: gcsBucket}, dashboardName)
			//persist job and shard level data in the metadata db
			err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, false)
			if err != nil {
				logger.Log.Info(fmt.Sprintf("Error storing job details in SMT metadata store...the migration job will still continue as intended. %v", err))
			} else {
				//only attempt persisting shard level data if the job level data is persisted
				err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, constants.DEFAULT_SHARD_ID)
				if err != nil {
					logger.Log.Info(fmt.Sprintf("Error storing details for migration job: %s, data shard: %s in SMT metadata store...the migration job will still continue as intended. err = %v\n", migrationJobId, constants.DEFAULT_SHARD_ID, err))
				}
			}
			return bw, nil
		}
		//bulk migration for a single shard
		return snapshotMigration.performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}, &common.InfoSchemaImpl{}, &PopulateDataConvImpl{}), nil
	}
}