in tools/mc2bq/pkg/export/export.go [140:189]
func Export(params *Params) error {
normalizeParams(params)
// The operation never times out, the user can just kill the tool.
ctx := context.Background()
path := mcutil.ProjectAndLocation{Project: params.ProjectID, Location: params.Region}
bq, err := bigquery.NewClient(ctx, params.TargetProjectID, buildClientOptions(params)...)
if err != nil {
return fmt.Errorf("create bigquery client: %w", err)
}
defer bq.Close()
fmt.Println(messages.ExportCreatingDataset{DatasetID: params.DatasetID})
dataset := bq.Dataset(params.DatasetID)
err = dataset.Create(ctx, &bigquery.DatasetMetadata{
Name: params.DatasetID,
})
err = gapiutil.IgnoreErrorWithCode(err, http.StatusConflict)
if err != nil {
return fmt.Errorf("create dataset: %w", err)
}
mc, err := MCFactory(ctx, params)
if err != nil {
return err
}
grp, ctx := errgroup.WithContext(ctx)
assetCount, err := mc.AssetCount(ctx, path)
if err != nil {
return fmt.Errorf("fetch asset count: %w", err)
}
assetSource := mc.AssetSource(ctx, path)
groupSource := mc.GroupSource(ctx, path)
preferenceSetSource := mc.PreferenceSetSource(ctx, path)
grp.Go(newExportTask(ctx, dataset, params, groupSource, "groups", 0))
grp.Go(newExportTask(ctx, dataset, params, assetSource, "assets", uint64(assetCount)))
grp.Go(newExportTask(ctx, dataset, params, preferenceSetSource, "preference_sets", 0))
err = grp.Wait()
if err != nil {
return err
}
fmt.Println(messages.ExportComplete{
BytesTransferred: assetSource.BytesRead() + groupSource.BytesRead() + preferenceSetSource.BytesRead(),
})
return nil
}