func()

in conversion/data_from_database.go [61:247]


func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) {
	// Fetch Spanner Region
	if conv.SpRegion == "" {
		spAcc, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
		if err != nil {
			return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
		}
		spannerRegion, err := spAcc.GetSpannerLeaderLocation(ctx, "projects/"+targetProfile.Conn.Sp.Project+"/instances/"+targetProfile.Conn.Sp.Instance)
		if err != nil {
			return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err)
		}
		conv.SpRegion = spannerRegion
	}

	storageClient, err := storageclient.NewStorageClientImpl(ctx)
	if err != nil {
		return nil, err
	}

	// Create Resources required for migration
	if conv.ResourceValidation {
		dsClient, err := datastreamclient.NewDatastreamClientImpl(ctx)
		if err != nil {
			return nil, err
		}
		createResources := NewValidateOrCreateResourcesImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageClient)
		err = createResources.ValidateOrCreateResourcesForShardedMigration(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, false, conv.SpRegion, sourceProfile)
		if err != nil {
			return nil, fmt.Errorf("unable to create connection profiles: %v", err)
		}
	}

	//Set the TmpDir from the sessionState bucket which is derived from the target connection profile
	for _, dataShard := range sourceProfile.Config.ShardConfigurationDataflow.DataShards {
		if dataShard.TmpDir == "" {
			bucket, rootPath, err := GetBucketFromDatastreamProfile(migrationProjectId, conv.SpRegion, dataShard.DstConnectionProfile.Name)
			if err != nil {
				return nil, fmt.Errorf("error while getting target bucket: %v", err)
			}
			dataShard.TmpDir = "gs://" + bucket + rootPath
		}
	}

	updateShardsWithTuningConfigs(sourceProfile.Config.ShardConfigurationDataflow)
	//Generate a job Id
	migrationJobId := conv.Audit.MigrationRequestId
	fmt.Printf("Creating a migration job with id: %v. This jobId can be used in future commmands (such as cleanup) to refer to this job.\n", migrationJobId)
	conv.Audit.StreamingStats.ShardToShardResourcesMap = make(map[string]internal.ShardResources)
	schemaDetails, err := is.GetIncludedSrcTablesFromConv(conv)
	if err != nil {
		fmt.Printf("unable to determine tableList from schema, falling back to full database")
		schemaDetails = map[string]internal.SchemaDetails{}
	}
	err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, true)
	if err != nil {
		logger.Log.Info(fmt.Sprintf("Error storing job details in SMT metadata store...the migration job will still continue as intended. %v", err))
	}
	asyncProcessShards := func(p *profiles.DataShard, mutex *sync.Mutex) task.TaskResult[*profiles.DataShard] {
		dbNameToShardIdMap := make(map[string]string)
		for _, l := range p.LogicalShards {
			dbNameToShardIdMap[l.DbName] = l.LogicalShardId
		}
		if p.DataShardId == "" {
			dataShardId, err := utils.GenerateName("smt-datashard")
			dataShardId = strings.Replace(dataShardId, "_", "-", -1)
			if err != nil {
				return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
			}
			p.DataShardId = dataShardId
			fmt.Printf("Data shard id generated: %v\n", p.DataShardId)
		}
		streamingCfg := streaming.CreateStreamingConfig(*p)
		err := streaming.VerifyAndUpdateCfg(&streamingCfg, targetProfile.Conn.Sp.Dbname, schemaDetails)
		if err != nil {
			err = fmt.Errorf("failed to process shard: %s, there seems to be an error in the sharding configuration, error: %v", p.DataShardId, err)
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)
		pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS)
		if err != nil {
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		streamingCfg.PubsubCfg = *pubsubCfg
		dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.DLQ_GCS)
		if err != nil {
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
		err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, migrationProjectId, streamingCfg.DatastreamCfg)
		if err != nil {
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
		dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, targetProfile, streamingCfg, conv)
		if err != nil {
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		// store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values)

		// Fetch and store the GCS bucket associated with the datastream
		dsClient := GetDatastreamClient(ctx)
		gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
		if fetchGcsErr != nil {
			logger.Log.Info(fmt.Sprintf("Could not fetch GCS Bucket for Shard %s hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n", p.DataShardId))
			logger.Log.Debug("Error", zap.Error(fetchGcsErr))
		}

		// Try to apply lifecycle rule to Datastream destination bucket.
		gcsConfig := streamingCfg.GcsCfg
		sc, err := storageclient.NewStorageClientImpl(ctx)
		if err != nil {
			return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
		}
		sa := storageaccessor.StorageAccessorImpl{}
		if gcsConfig.TtlInDaysSet {
			err = sa.ApplyBucketLifecycleDeleteRule(ctx, sc, storageaccessor.StorageBucketMetadata{
				BucketName:    gcsBucket,
				Ttl:           gcsConfig.TtlInDays,
				MatchesPrefix: []string{gcsDestPrefix},
			})
			if err != nil {
				logger.Log.Warn(fmt.Sprintf("\nWARNING: could not update Datastream destination GCS bucket with lifecycle rule, error: %v\n", err))
				logger.Log.Warn("Please apply the lifecycle rule manually. Continuing...\n")
			}
		}

		// create monitoring dashboard for a single shard
		monitoringResources := metrics.MonitoringMetricsResources{
			MigrationProjectId:   migrationProjectId,
			DataflowJobId:        dfOutput.JobID,
			DatastreamId:         streamingCfg.DatastreamCfg.StreamId,
			JobMetadataGcsBucket: gcsBucket,
			PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId,
			SpannerProjectId:     targetProfile.Conn.Sp.Project,
			SpannerInstanceId:    targetProfile.Conn.Sp.Instance,
			SpannerDatabaseId:    targetProfile.Conn.Sp.Dbname,
			ShardId:              p.DataShardId,
			MigrationRequestId:   conv.Audit.MigrationRequestId,
		}
		respDash, dashboardErr := monitoringResources.CreateDataflowShardMonitoringDashboard(ctx)
		var dashboardName string
		if dashboardErr != nil {
			dashboardName = ""
			logger.Log.Info(fmt.Sprintf("Creation of the monitoring dashboard for shard %s failed, please create the dashboard manually\n", p.DataShardId))
			logger.Log.Debug("Error", zap.Error(dashboardErr))
		} else {
			dashboardName = strings.Split(respDash.Name, "/")[3]
			fmt.Printf("Monitoring Dashboard for shard %v: %+v\n", p.DataShardId, dashboardName)
		}
		streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, migrationProjectId, p.DataShardId, internal.GcsResources{BucketName: gcsBucket}, dashboardName)
		//persist the generated resources in a metadata db
		err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, p.DataShardId)
		if err != nil {
			fmt.Printf("Error storing generated resources in SMT metadata store for dataShardId: %s...the migration job will still continue as intended, error: %v\n", p.DataShardId, err)
		}
		return task.TaskResult[*profiles.DataShard]{Result: p, Err: err}
	}
	r := task.RunParallelTasksImpl[*profiles.DataShard, *profiles.DataShard]{}
	_, err = r.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, false)
	if err != nil {
		return nil, fmt.Errorf("unable to start minimal downtime migrations: %v", err)
	}

	// create monitoring aggregated dashboard for sharded migration
	aggMonitoringResources := metrics.MonitoringMetricsResources{
		MigrationProjectId:       migrationProjectId,
		SpannerProjectId:         targetProfile.Conn.Sp.Project,
		SpannerInstanceId:        targetProfile.Conn.Sp.Instance,
		SpannerDatabaseId:        targetProfile.Conn.Sp.Dbname,
		ShardToShardResourcesMap: conv.Audit.StreamingStats.ShardToShardResourcesMap,
		MigrationRequestId:       conv.Audit.MigrationRequestId,
	}
	aggRespDash, dashboardErr := aggMonitoringResources.CreateDataflowAggMonitoringDashboard(ctx)
	if dashboardErr != nil {
		logger.Log.Error(fmt.Sprintf("Creation of the aggregated monitoring dashboard failed, please create the dashboard manually\n error=%v\n", dashboardErr))
	} else {
		fmt.Printf("Aggregated Monitoring Dashboard: %+v\n", strings.Split(aggRespDash.Name, "/")[3])
		conv.Audit.StreamingStats.AggMonitoringResources = internal.MonitoringResources{DashboardName: strings.Split(aggRespDash.Name, "/")[3]}
	}
	err = streaming.PersistAggregateMonitoringResources(ctx, targetProfile, sourceProfile, conv, migrationJobId)
	if err != nil {
		logger.Log.Info(fmt.Sprintf("Unable to store aggregated monitoring dashboard in metadata database\n error=%v\n", err))
	} else {
		logger.Log.Debug("Aggregate monitoring resources stored successfully.\n")
	}
	return &writer.BatchWriter{}, nil
}