func Export()

in tools/mc2bq/pkg/export/export.go [140:189]


func Export(params *Params) error {
	normalizeParams(params)
	// The operation never times out, the user can just kill the tool.
	ctx := context.Background()

	path := mcutil.ProjectAndLocation{Project: params.ProjectID, Location: params.Region}
	bq, err := bigquery.NewClient(ctx, params.TargetProjectID, buildClientOptions(params)...)
	if err != nil {
		return fmt.Errorf("create bigquery client: %w", err)
	}
	defer bq.Close()

	fmt.Println(messages.ExportCreatingDataset{DatasetID: params.DatasetID})
	dataset := bq.Dataset(params.DatasetID)
	err = dataset.Create(ctx, &bigquery.DatasetMetadata{
		Name: params.DatasetID,
	})
	err = gapiutil.IgnoreErrorWithCode(err, http.StatusConflict)
	if err != nil {
		return fmt.Errorf("create dataset: %w", err)
	}

	mc, err := MCFactory(ctx, params)
	if err != nil {
		return err
	}
	grp, ctx := errgroup.WithContext(ctx)

	assetCount, err := mc.AssetCount(ctx, path)
	if err != nil {
		return fmt.Errorf("fetch asset count: %w", err)
	}
	assetSource := mc.AssetSource(ctx, path)
	groupSource := mc.GroupSource(ctx, path)
	preferenceSetSource := mc.PreferenceSetSource(ctx, path)
	grp.Go(newExportTask(ctx, dataset, params, groupSource, "groups", 0))
	grp.Go(newExportTask(ctx, dataset, params, assetSource, "assets", uint64(assetCount)))
	grp.Go(newExportTask(ctx, dataset, params, preferenceSetSource, "preference_sets", 0))

	err = grp.Wait()
	if err != nil {
		return err
	}

	fmt.Println(messages.ExportComplete{
		BytesTransferred: assetSource.BytesRead() + groupSource.BytesRead() + preferenceSetSource.BytesRead(),
	})

	return nil
}