tools/mc2bq/pkg/export/export.go (239 lines of code) (raw):
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package export provides the actual exporting logic for the bigquery_exporter
package export
import (
"context"
_ "embed"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"cloud.google.com/go/bigquery"
migrationcenter "cloud.google.com/go/migrationcenter/apiv1"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/reflect/protoreflect"
"github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/gapiutil"
"github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/mcutil"
"github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/messages"
exporterschema "github.com/GoogleCloudPlatform/migrationcenter-utils/tools/mc2bq/pkg/schema"
)
var errTableExists = messages.NewError(messages.ErrMsgExportTableExists)
// Params are the parameters for the Export function.
type Params struct {
ProjectID string
Region string
TargetProjectID string
Force bool
DatasetID string
TablePrefix string
Schema *exporterschema.ExporterSchema
MCOptions []option.ClientOption
UserAgentSuffix string
}
func normalizeParams(params *Params) {
if params.TargetProjectID == "" {
params.TargetProjectID = params.ProjectID
}
if params.Schema == nil {
params.Schema = &exporterschema.EmbeddedSchema
}
}
func buildClientOptions(params *Params) []option.ClientOption {
opts := []option.ClientOption{}
userAgent := messages.UserAgent
if params.UserAgentSuffix != "" {
userAgent += "_" + params.UserAgentSuffix
}
opts = append(opts, option.WithUserAgent(userAgent))
return opts
}
func buildMCClientOptions(params *Params) []option.ClientOption {
return append(buildClientOptions(params), params.MCOptions...)
}
func newExportTask(ctx context.Context, dataset *bigquery.Dataset, params *Params, src mcutil.ObjectSource, tableSuffix string, objectCount uint64) func() error {
tblName := params.TablePrefix + tableSuffix
return func() error {
done := make(chan bool, 1)
defer close(done)
go func() {
const timeout = 5 * time.Second
timer := time.NewTimer(timeout)
for {
select {
case <-done:
timer.Stop()
return
case <-timer.C:
if src.ObjectsRead() == objectCount || src.ObjectsRead() == 0 {
// Don't write progress if we haven't started or just finished
continue
}
fmt.Println(messages.ExportTableInProgress{
TableName: tblName,
RecordsTransferred: src.ObjectsRead(),
RecordCount: objectCount,
BytesTransferred: src.BytesRead(),
})
timer.Reset(timeout)
}
}
}()
err := exportObjects(ctx, dataset, params, src, tblName)
if err != nil {
return fmt.Errorf("export %s: %w", tableSuffix, err)
}
done <- true
fmt.Println(messages.ExportTableComplete{
TableName: tblName,
RecordCount: src.ObjectsRead(),
BytesTransferred: src.BytesRead(),
})
return nil
}
}
// MCFactory creates a an MC implementation accorording to params
func MCFactory(ctx context.Context, params *Params) (mcutil.MC, error) {
svc, err := migrationcenter.NewClient(ctx, buildMCClientOptions(params)...)
if err != nil {
return nil, fmt.Errorf("create migration center client: %w", err)
}
return &MCv1{
client: svc,
schema: params.Schema,
}, nil
}
// Export exports migration center data to BigQuery
func Export(params *Params) error {
normalizeParams(params)
// The operation never times out, the user can just kill the tool.
ctx := context.Background()
path := mcutil.ProjectAndLocation{Project: params.ProjectID, Location: params.Region}
bq, err := bigquery.NewClient(ctx, params.TargetProjectID, buildClientOptions(params)...)
if err != nil {
return fmt.Errorf("create bigquery client: %w", err)
}
defer bq.Close()
fmt.Println(messages.ExportCreatingDataset{DatasetID: params.DatasetID})
dataset := bq.Dataset(params.DatasetID)
err = dataset.Create(ctx, &bigquery.DatasetMetadata{
Name: params.DatasetID,
})
err = gapiutil.IgnoreErrorWithCode(err, http.StatusConflict)
if err != nil {
return fmt.Errorf("create dataset: %w", err)
}
mc, err := MCFactory(ctx, params)
if err != nil {
return err
}
grp, ctx := errgroup.WithContext(ctx)
assetCount, err := mc.AssetCount(ctx, path)
if err != nil {
return fmt.Errorf("fetch asset count: %w", err)
}
assetSource := mc.AssetSource(ctx, path)
groupSource := mc.GroupSource(ctx, path)
preferenceSetSource := mc.PreferenceSetSource(ctx, path)
grp.Go(newExportTask(ctx, dataset, params, groupSource, "groups", 0))
grp.Go(newExportTask(ctx, dataset, params, assetSource, "assets", uint64(assetCount)))
grp.Go(newExportTask(ctx, dataset, params, preferenceSetSource, "preference_sets", 0))
err = grp.Wait()
if err != nil {
return err
}
fmt.Println(messages.ExportComplete{
BytesTransferred: assetSource.BytesRead() + groupSource.BytesRead() + preferenceSetSource.BytesRead(),
})
return nil
}
type iterable[T any] interface {
Next() (T, error)
}
type objectReader[T protoreflect.ProtoMessage] struct {
schema bigquery.Schema
it iterable[T]
serializer func(obj T) ([]byte, error)
buf []byte
objectsRead uint64
bytesRead uint64
}
func newObjectReader[T protoreflect.ProtoMessage](it iterable[T], root string, schema bigquery.Schema) *objectReader[T] {
return &objectReader[T]{
serializer: exporterschema.NewSerializer[T](root, schema),
it: it,
schema: schema,
}
}
func (r *objectReader[T]) Schema() bigquery.Schema {
return r.schema
}
func (r *objectReader[T]) BytesRead() uint64 {
return r.bytesRead
}
func (r *objectReader[T]) ObjectsRead() uint64 {
return r.objectsRead
}
// Read reads the next len(p) bytes from the asset stream.
// The return value n is the number of bytes read.
// If the buffer has no data to return, err is io.EOF (unless len(p) is zero); otherwise it is nil.
func (r *objectReader[T]) Read(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
if len(r.buf) == 0 {
asset, err := r.it.Next()
if errors.Is(err, iterator.Done) {
return 0, io.EOF
}
if err != nil {
return 0, err
}
r.buf, err = r.serializer(asset)
if err != nil {
return 0, err
}
r.objectsRead++
}
n := copy(buf, r.buf)
r.bytesRead += uint64(n)
r.buf = r.buf[n:]
return n, nil
}
func newMigrationCenterLoadSource[T protoreflect.ProtoMessage](r *objectReader[T]) bigquery.LoadSource {
// Creating a full blown bigquery.LoadSource requires a lot of low level big query operations.
// To save on time we create a ReaderSource and feed it the assets as a json stream.
src := bigquery.NewReaderSource(r)
src.Schema = r.Schema()
src.SourceFormat = bigquery.JSON
return src
}
func exportObjects(ctx context.Context, dataset *bigquery.Dataset, params *Params, src bigquery.LoadSource, tableName string) error {
tbl := dataset.Table(tableName)
_, err := tbl.Metadata(ctx)
if err != nil && !gapiutil.IsErrorWithCode(err, http.StatusNotFound) {
return err
}
if err == nil && !params.Force {
return errTableExists
}
err = gapiutil.IgnoreErrorWithCode(tbl.Delete(ctx), http.StatusNotFound)
if err != nil {
return err
}
fmt.Println(messages.ExportingDataToTable{TableName: tableName})
loader := tbl.LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
err = status.Err()
if err != nil {
if len(status.Errors) > 0 {
var sb strings.Builder
fmt.Fprintf(&sb, "encountered errors during export:")
for _, err := range status.Errors {
fmt.Fprintf(&sb, "\n\t%v", err)
}
return errors.New(sb.String())
}
return err
}
return nil
}