conversion/snapshot_migration.go (36 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package conversion
import (
"fmt"
sp "cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer"
)
type SnapshotMigrationInterface interface {
performSnapshotMigration(config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema, additionalAttributes internal.AdditionalDataAttributes, infoSchemaI common.InfoSchemaInterface, populateDataConv PopulateDataConvInterface) *writer.BatchWriter
snapshotMigrationHandler(sourceProfile profiles.SourceProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema) (*writer.BatchWriter, error)
}
type SnapshotMigrationImpl struct {}
func (sm *SnapshotMigrationImpl) performSnapshotMigration(config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema, additionalAttributes internal.AdditionalDataAttributes, infoSchemaI common.InfoSchemaInterface, populateDataConv PopulateDataConvInterface) *writer.BatchWriter {
infoSchemaI.SetRowStats(conv, infoSchema)
totalRows := conv.Rows()
if !conv.Audit.DryRun {
conv.Audit.Progress = *internal.NewProgress(totalRows, "Writing data to Spanner", internal.Verbose(), false, int(internal.DataWriteInProgress))
}
batchWriter := populateDataConv.populateDataConv(conv, config, client)
infoSchemaI.ProcessData(conv, infoSchema, additionalAttributes)
batchWriter.Flush()
return batchWriter
}
func (sm *SnapshotMigrationImpl) snapshotMigrationHandler(sourceProfile profiles.SourceProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema) (*writer.BatchWriter, error) {
switch sourceProfile.Driver {
// Skip snapshot migration via Spanner migration tool for mysql and oracle since dataflow job will job will handle this from backfilled data.
case constants.MYSQL, constants.ORACLE, constants.POSTGRES:
return &writer.BatchWriter{}, nil
case constants.DYNAMODB:
return sm.performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}, &common.InfoSchemaImpl{}, &PopulateDataConvImpl{}), nil
default:
return &writer.BatchWriter{}, fmt.Errorf("streaming migration not supported for driver %s", sourceProfile.Driver)
}
}