tools/bqman/golang/bqhandler/bqhandler.go (245 lines of code) (raw):
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bqhandler
import (
"context"
"encoding/json"
"fmt"
"log"
"cloud.google.com/go/bigquery"
util "github.com/GoogleCloudPlatform/bqman/util"
bqv2 "google.golang.org/api/bigquery/v2"
"google.golang.org/api/iterator"
)
// BigQueryHandler is used to interface with BigQuery
// via the API
type BigQueryHandler struct {
Ctx context.Context
Client *bigquery.Client
ProjectID string
}
// BqColumn is used to hold the BigQuery column name
type BqColumn struct {
Name string
}
// BqTable is used to hold the BigQuery table metadata
type BqTable struct {
TableID string
Columns []BqColumn
TableMetadata *bigquery.TableMetadata
}
// BqDataset is used to hold BigQuery dataset metadata
type BqDataset struct {
Name string
DatasetID string
Tables []BqTable
DatasetMetadata *bigquery.DatasetMetadata
}
// BqSchema is used to forward / revese engineer BigQuery
// JSON schema files
type BqSchema struct {
Description string `json:"description"`
Name string `json:"name"`
Type string `json:"type"`
Mode string `json:"mode,omitempty"`
Fields []BqSchema `json:"fields,omitempty"`
}
// NewBigQueryHandler returns a pointer to a new instance of
// BigQueryHandler
func NewBigQueryHandler(ctx context.Context, projectID string) *BigQueryHandler {
var err error
bqHandler := new(BigQueryHandler)
bqHandler.Ctx = ctx
bqHandler.Client, err = bigquery.NewClient(ctx, projectID)
bqHandler.ProjectID = projectID
util.CheckError(err, "Unable to create BigQuery service!")
return bqHandler
}
// GetBigQueryDatasets returs an array of BqDataset objects
// using the datasetID provide or an error
func (bh BigQueryHandler) GetBigQueryDatasets(datasetID string) ([]BqDataset, error) {
log.Printf("GetBigQueryDatasets(%s) executing...", bh.ProjectID)
it := bh.Client.Datasets(bh.Ctx)
it.ProjectID = bh.ProjectID
datasets := make([]BqDataset, 0)
dataset, err := it.Next()
util.CheckError(err, "GetBigQueryDatasets() failed!")
for dataset != nil {
fmt.Printf("datasetID: %s; dataset.DatasetID: %s\n", datasetID, dataset.DatasetID)
if datasetID == "" || dataset.DatasetID == datasetID {
bqDataset := new(BqDataset)
bqDataset.DatasetID = dataset.DatasetID
bqDataset.DatasetMetadata, err = dataset.Metadata(bh.Ctx)
util.CheckError(err, "GetBigQueryDatasets().Metadata error")
datasets = append(datasets, *bqDataset)
}
dataset, err = it.Next()
if err == iterator.Done {
log.Printf("GetBigQueryDatasets().iterator.Done")
return datasets, nil
}
}
util.CheckError(err, "GetBigQueryDatasets() failed!")
log.Printf("GetBigQueryDatasets() completed.")
return datasets, err
}
// GetBigQueryTables returns an array of BqTable objects
// for a given dataset or an error
func (bh BigQueryHandler) GetBigQueryTables(datasetID string) ([]BqTable, error) {
log.Printf("GetBigQueryTables(%s, %s) executing...", bh.ProjectID, datasetID)
tables := make([]BqTable, 0)
it := bh.Client.Dataset(datasetID).Tables(bh.Ctx)
table, err := it.Next()
if err != nil && err != iterator.Done {
util.CheckError(err, "GetBigQueryTables() failed!")
}
for table != nil {
log.Printf("GetBigQueryTables().table.TableID: %s", table.TableID)
bqTable := new(BqTable)
bqTable.TableID = table.TableID
bqTable.TableMetadata, err = table.Metadata(bh.Ctx)
util.CheckError(err, "GetBigQueryTables().Metadata() failed")
bqTable.Columns, err = bh.GetBigQueryColumns(bh.Ctx, table)
util.CheckError(err, "GetBigQueryTables().GetBigQueryColumns() failed")
tables = append(tables, *bqTable)
table, err = it.Next()
if err == iterator.Done {
log.Printf("GetBigQueryTables().iterator.Done")
return tables, nil
}
}
if err != nil && err != iterator.Done {
fmt.Printf("GetBigQueryTables().err != iterator.Done : %s", err.Error())
util.CheckError(err, "GetBigQueryTables() failed!")
}
log.Printf("GetBigQueryTables() completed.")
return tables, err
}
// GetBigQueryColumns returns a list of BqColumn objects
// for a given BigQuery table or an error
func (bh BigQueryHandler) GetBigQueryColumns(ctx context.Context, table *bigquery.Table) ([]BqColumn, error) {
log.Printf("GetBigQueryColumns(%s) executing...", table.TableID)
metadata, err := table.Metadata(ctx)
util.CheckError(err, "GetBigQueryColumns().table.Metadata() failed!")
bqColumns := make([]BqColumn, 0)
for _, fs := range metadata.Schema {
bqColumns = append(bqColumns, BqColumn{Name: fs.Name})
}
log.Printf("GetBigQueryColumns() completed.")
return bqColumns, nil
}
// CheckDatasetExists is used to determine the existence
// of a BigQuery dataset
func (bh *BigQueryHandler) CheckDatasetExists(datasetID string) (bool, error) {
log.Printf("CheckDatasetExists(%s) executing", datasetID)
_, err := bh.Client.Dataset(datasetID).Metadata(bh.Ctx)
if err != nil {
//log.Printf("CheckDatasetExists() error: %s", err)
return false, err
}
log.Printf("%s: Dataset exists", datasetID)
log.Printf("CheckDatasetExists(%s) completed", datasetID)
return true, nil
}
// FieldSchemaToBQ convers a FieldSchema (BigQuery API V1)
// to TableFieldSchema (BigQuery API V2)
// Please refer to
// https://github.com/googleapis/google-cloud-go/blob/096c584342beb49d9653a670d7813b9dbaca72b8/bigquery/schema.go#L75
func (bh *BigQueryHandler) FieldSchemaToBQ(fs *bigquery.FieldSchema) *bqv2.TableFieldSchema {
tfs := &bqv2.TableFieldSchema{
Description: fs.Description,
Name: fs.Name,
Type: string(fs.Type),
//PolicyTags: fs.PolicyTags.toBQ(),
}
if fs.Repeated {
tfs.Mode = "REPEATED"
} else if fs.Required {
tfs.Mode = "REQUIRED"
} // else leave as default, which is interpreted as NULLABLE.
for _, f := range fs.Schema {
//tfs.Fields = append(tfs.Fields, f.toBQ())
tfs.Fields = append(tfs.Fields, bh.FieldSchemaToBQ(f))
}
return tfs
}
// SchemaToBQ converts BigQuery.Schema object (API V1)
// to TableSchema object (API V2)
func (bh *BigQueryHandler) SchemaToBQ(s bigquery.Schema) *bqv2.TableSchema {
var fields []*bqv2.TableFieldSchema
for _, f := range s {
//fields = append(fields, f.toBQ())
fields = append(fields, bh.FieldSchemaToBQ(f))
}
return &bqv2.TableSchema{Fields: fields}
}
// PatchBigQueryTable is used to modify column descriptions
func (bh *BigQueryHandler) PatchBigQueryTable(projectID, datasetID string, bqSchema bigquery.Schema, tableRef *bigquery.Table) error {
log.Printf("PatchBigQueryTable() executing")
meta, err := tableRef.Metadata(bh.Ctx)
util.CheckError(err, "PatchBigQueryTable metadata get failed")
fmt.Printf("PatchBigQueryTable(%s).meta: %v\n", tableRef.TableID, meta)
for _, fs := range meta.Schema {
fmt.Printf("meta.Schema: %s: %s\n", fs.Name, fs.Description)
}
for _, fs := range bqSchema {
fmt.Printf("bqSchema: %s: %s\n", fs.Name, fs.Description)
}
bqv2Service, err := bqv2.NewService(bh.Ctx)
util.CheckError(err, "PatchBigQueryTable().NewService() failed")
bqv2TablesService := bqv2.NewTablesService(bqv2Service)
tablesGetCall := bqv2TablesService.Get(projectID, datasetID, tableRef.TableID)
bqv2Table, err := tablesGetCall.Do()
bqv2Table.Schema = bh.SchemaToBQ(bqSchema)
util.CheckError(err, "PatchBigQueryTable().TablesGetCall().Do() failed")
bqv2TablesPatchCall := bqv2TablesService.Patch(projectID, datasetID, tableRef.TableID, bqv2Table)
_, err = bqv2TablesPatchCall.Do()
util.CheckError(err, "PatchBigQueryTable().bqv2TablesPatchCall().Do() failed")
log.Printf("PatchBigQueryTable() completed")
return nil
}
// ConvertToJSON converts an array of BqSchema structs to JSON byte array
func (bh *BigQueryHandler) ConvertToJSON(records []BqSchema) []byte {
log.Printf("ConvertToJSON() executing")
bytes, err := json.Marshal(records)
if err != nil {
log.Fatalf("ConvertToJSON(): json.Marshal() failed")
}
log.Printf("ConvertToJSON() completed")
return bytes
}
// LoadDataFromGCS is used to restore BigQuery tables from
// sharded CSV files in Google Cloud Storage generated
// via a backup. Please refer to
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv
func (bh *BigQueryHandler) LoadDataFromGCS(table, datasetID, gcsPath string, schema bigquery.Schema) {
log.Printf("LoadDataFromGCS() executing")
gcsURI := fmt.Sprintf("%s/%s/%s-*.csv", gcsPath, table, table)
log.Printf("gcsURI: %s", gcsURI)
gcsRef := bigquery.NewGCSReference(gcsURI)
gcsRef.Schema = schema
loader := bh.Client.Dataset(datasetID).Table(table).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteEmpty
job, err := loader.Run(bh.Ctx)
util.CheckError(err, "LoadDataFromGCS().loader.Run() failed")
status, err := job.Wait(bh.Ctx)
util.CheckError(err, "LoadDataFromGCS().job.Wait() failed")
util.CheckError(status.Err(), "job completed with error")
log.Printf("LoadDataFromGCS() completed")
}
// ShowMissingColumns is a convenience method to display
// schema differences
func (bh *BigQueryHandler) ShowMissingColumns(prefix, tableID, datasetID string, missingColumns []bigquery.FieldSchema) {
for i := 0; i < len(missingColumns); i++ {
missingCol := missingColumns[i]
log.Printf("Column [%s] is missing in table [%s.%s]", missingColumns[i].Name, datasetID, tableID)
log.Printf("%s(%s).Schema: %v", prefix, tableID, missingCol)
}
}
// FindMissingColumnsInSchema is used to identify schema differences
// between BigQuery tables
func (bh *BigQueryHandler) FindMissingColumnsInSchema(tableID, datasetID string, schema bigquery.Schema) (*bigquery.Table, []bigquery.FieldSchema) {
log.Printf("FindMissingColumnsInSchema() executing")
// Store current (live) table schema in currentSchemaMap
tableRef := bh.Client.Dataset(datasetID).Table(tableID)
bqColumns, err := bh.GetBigQueryColumns(bh.Ctx, tableRef)
util.CheckError(err, "AddColumnsToTable().GetBigQueryColumns() failed")
currentSchemaMap := make(map[string]BqColumn, 0)
for _, bqColumn := range bqColumns {
currentSchemaMap[bqColumn.Name] = bqColumn
}
missingColumns := make([]bigquery.FieldSchema, 0)
for i := 0; i < len(schema); i++ {
if _, exists := currentSchemaMap[schema[i].Name]; !exists {
missingColumns = append(missingColumns, *schema[i])
}
}
//t.ShowMissingColumns("FindMissingColumnsInSchema", tableID, missingColumns)
log.Printf("FindMissingColumnsInSchema() completed")
return tableRef, missingColumns
}
// AddColumnsToTable is used to append NULLABLE columns at the end
// of a table and maintain database integrity
func (bh *BigQueryHandler) AddColumnsToTable(datasetID string, tableRef *bigquery.Table, missingColumns []bigquery.FieldSchema) error {
log.Printf("AddColumnsToTable() executing")
bh.ShowMissingColumns("AddColumnsToTable", tableRef.TableID, datasetID, missingColumns)
meta, err := tableRef.Metadata(bh.Ctx)
if err != nil {
return fmt.Errorf("AddColumnsToTable metadata get failed: %v", err)
}
newSchemaPtr := make([]*bigquery.FieldSchema, 0)
for i := 0; i < len(missingColumns); i++ {
newSchemaPtr = append(newSchemaPtr, &missingColumns[i])
}
newSchema := append(meta.Schema, newSchemaPtr...)
/*
for i := 0; i < len(newSchema); i++ {
ptr := newSchema[i]
obj := *ptr
log.Printf("AddColumnsToTable().*newSchema[%d].Name", i, obj.Name)
}
*/
update := bigquery.TableMetadataToUpdate{Schema: newSchema}
if _, err = tableRef.Update(bh.Ctx, update, meta.ETag); err != nil {
return fmt.Errorf("AddColumnsToTable update failed: %v", err)
}
log.Printf("AddColumnsToTable() completed")
return nil
}
// JsonifyBigquery is used to convert BigQuery TableSchema to
// JSON byte array
func (bh *BigQueryHandler) JsonifyBigquery(tableSchema *bqv2.TableSchema) []byte {
log.Printf("JsonifyBigquery() executing.")
b, err := json.Marshal(tableSchema.Fields)
util.CheckError(err, "json.Marshal(c) failed!")
log.Printf("JsonifyBigquery() completed")
return b
}