tools/bqman/golang/controller/trotter.go (383 lines of code) (raw):

/* Copyright 2020 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "encoding/json" "fmt" "log" "os" "path/filepath" "strings" "time" bigquery "cloud.google.com/go/bigquery" "github.com/GoogleCloudPlatform/bqman/bqhandler" "github.com/GoogleCloudPlatform/bqman/configparser" "github.com/GoogleCloudPlatform/bqman/executionmode" "github.com/GoogleCloudPlatform/bqman/gcshandler" "github.com/GoogleCloudPlatform/bqman/projecthandler" "github.com/GoogleCloudPlatform/bqman/util" "google.golang.org/api/iterator" ) // FilterCriteria is used to hold BigQuery table and column filter criteria type FilterCriteria struct { ProjectID string DatasetID string BqTable string BqColumn string } // RuntimeParameters is used to hold values that are used during program // execution type RuntimeParameters struct { Timestamp string Mode executionmode.ExecutionMode CacheDirPath string RuntimePath string LogDirPath string SchemaDirPath string LogFile string JSONFile string LogFileHandle *os.File Ctx context.Context BqHandler *bqhandler.BigQueryHandler GcsHandler *gcshandler.CloudStorageHandler ConfigFile string CfgParser *configparser.ConfigParser Location string Quiet bool SpreadsheetID string SheetTitle string SheetRange string SQLServerName string SQLServerPort int SQLServerUser string SQLServerPassword string SQLServerDatabase string } // GcpAssets is used to hold BigQuery dataset info type GcpAssets struct { Projects []string Datasets []bqhandler.BqDataset } // Trotter is used to hold filter criteria and runtime parameters type Trotter struct { Assets *GcpAssets Criteria *FilterCriteria Parameters *RuntimeParameters } const ( gcpCallTimeout = 30 * time.Second ) // Validate is used to confirm that all pre-conditions are met func (t *Trotter) Validate() { log.Printf("Trotter.Validate() executing.") tm := time.Now() t.Parameters.Timestamp = fmt.Sprintf("%d%02d%02dT%02d%02d%02d", tm.Year(), tm.Month(), tm.Day(), tm.Hour(), tm.Minute(), tm.Second()) operation := fmt.Sprint(t.Parameters.Mode) t.Parameters.RuntimePath = fmt.Sprintf("%s/%s/%s/%s", t.Parameters.CacheDirPath, t.Criteria.ProjectID, t.Criteria.DatasetID, operation) t.Parameters.LogDirPath = fmt.Sprintf("%s/history/%s", t.Parameters.RuntimePath, t.Parameters.Timestamp) t.Parameters.SchemaDirPath = fmt.Sprintf("%s/current", t.Parameters.RuntimePath) dirStatus := make(map[string]bool) dirStatus[t.Parameters.LogDirPath] = util.CheckDir(t.Parameters.LogDirPath, util.CheckDirAndCreate) if t.Parameters.Mode == executionmode.PullMode { dirStatus[t.Parameters.SchemaDirPath] = util.CheckDir(t.Parameters.SchemaDirPath, util.CheckDirAndCreate) } missingDirectories := make([]string, 0) for k, v := range dirStatus { if !v { missingDirectories = append(missingDirectories, k) } } if len(missingDirectories) > 0 { util.ShowStringArray(missingDirectories, "The following directories , err = bqHandler.GetBigqueryTables(t.Parameters.Ctx, t.Criteria.ProjectID, dataset.DatasetID)don't exist!") os.Exit(2) } log.Printf("Trotter.Validate() completed") } func (t *Trotter) initLogging() { //log.Printf("Trotter.initLogging() starting.") t.Parameters.LogFile = fmt.Sprintf("%s/bqman-%s.log", t.Parameters.LogDirPath, t.Parameters.Timestamp) t.Parameters.JSONFile = fmt.Sprintf("%s/bqman-%s.json", t.Parameters.LogDirPath, t.Parameters.Timestamp) var err error t.Parameters.LogFileHandle, err = os.Create(t.Parameters.LogFile) util.CheckError(err, fmt.Sprintf("%s: Unable to initialise logfile!", t.Parameters.LogFile)) if t.Parameters.Quiet { log.SetOutput(t.Parameters.LogFileHandle) } //log.Printf("Trotter.initLogging() completed") } // NewPullTrotter constructs and initialises the Trotter object // used to generate BigQuery JSON schema files from a BigQuery dataset func NewPullTrotter(projectID, bqDataset, cacheDir, location string, quiet bool) *Trotter { ctx := context.Background() trotter := &Trotter{ Criteria: &FilterCriteria{ ProjectID: projectID, DatasetID: bqDataset, }, Parameters: &RuntimeParameters{ CacheDirPath: cacheDir, Ctx: ctx, BqHandler: bqhandler.NewBigQueryHandler(ctx, projectID), Location: location, Quiet: quiet, }, } trotter.Parameters.Mode = executionmode.PullMode trotter.Assets = new(GcpAssets) trotter.Validate() trotter.initLogging() return trotter } // NewImportSpreadsheetTrotter is used to construct and initialise // a Trotter object for generating BigQuery JSON schema files from // Google Sheets func NewImportSpreadsheetTrotter(projectID, bqDataset, cacheDir, location, spreadsheetID, sheetTitle, sheetRange string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.ImportSpreadsheetMode trotter.Parameters.SpreadsheetID = spreadsheetID trotter.Parameters.SheetTitle = sheetTitle trotter.Parameters.SheetRange = sheetRange trotter.Validate() trotter.initLogging() return trotter } // NewImportSQLServerTrotter is used to construct and initialise // a Trotter object for generating BigQuery JSON schema files // from a SQL server database func NewImportSQLServerTrotter(projectID, bqDataset, cacheDir, location, server, user, password, database string, port int, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.ImportSqlserverMode trotter.Parameters.SQLServerName = server trotter.Parameters.SQLServerUser = user trotter.Parameters.SQLServerPassword = password trotter.Parameters.SQLServerDatabase = database trotter.Parameters.SQLServerPort = port trotter.Validate() trotter.initLogging() return trotter } // NewPushTrotter is used to construct and initialise a Trotter // object for creating a new dataset and tables in BigQuery // using BigQuery JSON schema files func NewPushTrotter(projectID, bqDataset, cacheDir, schemaDir, location, config string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.PushMode trotter.Parameters.SchemaDirPath = schemaDir trotter.Parameters.ConfigFile = config if len(config) > 0 { trotter.Parameters.CfgParser = configparser.NewConfigParser(config) } return trotter } // NewDeleteTrotter is used to construct and initialise a Trotter // object for deleting an empty BigQuery dataset func NewDeleteTrotter(projectID, bqDataset, cacheDir, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.DeleteMode return trotter } // NewDestroyTrotter is used to construct and initialise a Trotter object // for deleting a non-empty BigQuery dataset func NewDestroyTrotter(projectID, bqDataset, cacheDir, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.DestroyMode return trotter } // NewBackupTrotter is used to construct and initialise a Trotter object // for generating sharded CSV files in a Google Cloud Storage bucket // for each table within a dataset func NewBackupTrotter(projectID, bqDataset, cacheDir, gcsBucket, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.BackupMode trotter.Parameters.GcsHandler = gcshandler.NewCloudStorageHandler(trotter.Parameters.Ctx, gcsBucket) trotter.Parameters.GcsHandler.GcsPath = fmt.Sprintf("gs://%s/%s/%s/%s", trotter.Parameters.GcsHandler.GcsBucket, projectID, bqDataset, trotter.Parameters.Timestamp) return trotter } // NewRestoreTrotter is used to restore all tables within a BigQuery dataset // using sharded CSV files stored in a Google Cloud Storage bucket func NewRestoreTrotter(projectID, bqDataset, cacheDir, schemaDir, gcsPath, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.Mode = executionmode.RestoreMode trotter.Parameters.SchemaDirPath = schemaDir gcsBucket := strings.Replace(gcsPath, "gs://", "", -1) gcsBucket = strings.Split(gcsBucket, "/")[0] trotter.Parameters.GcsHandler = gcshandler.NewCloudStorageHandler(trotter.Parameters.Ctx, gcsBucket) trotter.Parameters.GcsHandler.GcsPath = gcsPath return trotter } // NewUpdateTrotter is used to construct and initialise a Trotter // object for adding new NULLABLE columns at the end of a BigQuery // table func NewUpdateTrotter(projectID, bqDataset, cacheDir, schemaDir, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.SchemaDirPath = schemaDir trotter.Parameters.Mode = executionmode.UpdateMode return trotter } // NewPatchTrotter is used to construct and initialise a Trotter object // for modifying the column description within BigQuery tables func NewPatchTrotter(projectID, bqDataset, cacheDir, schemaDir, location string, quiet bool) *Trotter { trotter := NewPullTrotter(projectID, bqDataset, cacheDir, location, quiet) trotter.Parameters.SchemaDirPath = schemaDir trotter.Parameters.Mode = executionmode.PatchMode return trotter } // SetProjects is used to fetch project details from GCP func (t *Trotter) SetProjects() { log.Printf("Trotter.SetProjects() executing...") var err error t.Assets.Projects, err = projecthandler.GetProjects(t.Parameters.Ctx, t.Criteria.ProjectID) util.CheckError(err, "SetProjects() failed!") log.Printf("Trotter.SetProjects() completed") } // SetDatasets is used to fetch dataset details from BigQuery func (t *Trotter) SetDatasets() { log.Printf("Trotter.SetDatasets(%s) executing...", t.Criteria.ProjectID) var err error bqHandler := t.Parameters.BqHandler t.Assets.Datasets, err = bqHandler.GetBigQueryDatasets(t.Criteria.DatasetID) if t.Parameters.Mode != executionmode.DeleteMode && t.Parameters.Mode != executionmode.DestroyMode { for i := 0; i < len(t.Assets.Datasets); i++ { t.Assets.Datasets[i].Tables, err = bqHandler.GetBigQueryTables(t.Assets.Datasets[i].DatasetID) } } if err != iterator.Done { util.CheckError(err, "SetDatasets() failed") } log.Printf("Trotter.SetDatasets() completed") } // WriteJSON is used to dump the Trotter parameters object // to a text file in JSON format func (t *Trotter) WriteJSON() { f, err := os.Create(t.Parameters.JSONFile) util.CheckError(err, "WriteJSON failed: ") b := t.Jsonify() nbytes, err := f.WriteString(string(b)) log.Printf("wrote %d bytes to: %s", nbytes, t.Parameters.JSONFile) f.Close() } // Jsonify is used to convert the Trotter object to a JSON byte array func (t *Trotter) Jsonify() []byte { log.Printf("Trotter.Jsonify() executing.") b, err := json.Marshal(t) util.CheckError(err, "json.Marshal(c) failed!") log.Printf("Trotter.Validate() completed") return b } // ShowFileLocations is a convenience method used to display // the location of the files generated during execution func (t *Trotter) ShowFileLocations() { if t.Parameters.Quiet { fmt.Printf("Log: %s\n", t.Parameters.LogFile) fmt.Printf("JSON: %s\n", t.Parameters.JSONFile) } } // DeleteDataset is used to delete an empty BigQuery dataset func (t *Trotter) DeleteDataset() { client := t.Parameters.BqHandler.Client log.Printf("Trotter.DeleteDataset() executing") err := client.Dataset(t.Criteria.DatasetID).Delete(t.Parameters.Ctx) util.CheckError(err, fmt.Sprintf("%s: Dataset deletion failed", t.Criteria.DatasetID)) log.Printf("Trotter.DeleteDataset() completed") } // DestroyDataset is used to delete a non-empty BigQuery dataset func (t *Trotter) DestroyDataset() { client := t.Parameters.BqHandler.Client log.Printf("Trotter.DestroyDataset() executing") err := client.Dataset(t.Criteria.DatasetID).DeleteWithContents(t.Parameters.Ctx) util.CheckError(err, fmt.Sprintf("%s Dataset deletion with contents failed", t.Criteria.DatasetID)) log.Printf("Trotter.DestroyDataset() completed") } // GenerateBigQueryJSON is used to generate BigQuery JSON // schema files using information contained in the // Trotter.Assets object func (t *Trotter) GenerateBigQueryJSON() { log.Printf("Trotter.GenerateBigQueryJSON() executing.") for dIdx := 0; dIdx < len(t.Assets.Datasets); dIdx++ { dataset := t.Assets.Datasets[dIdx] tables := t.Assets.Datasets[dIdx].Tables for tIdx := 0; tIdx < len(tables); tIdx++ { tableID := tables[tIdx].TableID schema := tables[tIdx].TableMetadata.Schema tableSchema := t.Parameters.BqHandler.SchemaToBQ(schema) currentTableJSONFile := fmt.Sprintf("%s/%s:%s.%s.schema", t.Parameters.LogDirPath, t.Criteria.ProjectID, dataset.DatasetID, tableID) previousTableJSONFile := fmt.Sprintf("%s/%s:%s.%s.schema", t.Parameters.SchemaDirPath, t.Criteria.ProjectID, dataset.DatasetID, tableID) log.Printf("tableJsonFile: %s\n", currentTableJSONFile) schemaBytes := t.Parameters.BqHandler.JsonifyBigquery(tableSchema) util.WriteByteArrayToFile(currentTableJSONFile, schemaBytes) if util.FileExists(previousTableJSONFile) { filesAreEqual := util.FilesAreEqual(previousTableJSONFile, currentTableJSONFile, schemaBytes) if !filesAreEqual { util.WriteByteArrayToFile(previousTableJSONFile, schemaBytes) } } else { util.WriteByteArrayToFile(previousTableJSONFile, schemaBytes) } } } log.Printf("Trotter.GenerateBigQueryJSON() completed") } // ProcessBigQueryTables is used to perform various BigQuery CI/CD // operations based on the ExecutionMode func (t *Trotter) ProcessBigQueryTables(mode executionmode.ExecutionMode) { log.Printf("ProcessBigQueryTables() executing") datasetID := t.Criteria.DatasetID bqHandler := t.Parameters.BqHandler client := bqHandler.Client ctx := bqHandler.Ctx files, err := util.FindFile(t.Parameters.SchemaDirPath, []string{".schema"}) util.CheckError(err, "ProcessBigQueryTables.util.FindFile() failed") datasetChecked := false for _, file := range files { bname := filepath.Base(file) parts := strings.Split(bname, ".") tableID := parts[1] log.Printf("Processing table %s\n", tableID) schemaLines, err := util.ReadFileToStringArray(file) util.CheckError(err, "ProcessBigQueryTables().ReadFile() failed") tableSchema := strings.Join(schemaLines[:], " ") if !datasetChecked { datasetExists, err := bqHandler.CheckDatasetExists(datasetID) if !datasetExists || err != nil { log.Printf("%s: Creating dataset...", datasetID) meta := &bigquery.DatasetMetadata{ Location: t.Parameters.Location, // See https://cloud.google.com/bigquery/docs/locations } if err := client.Dataset(datasetID).Create(ctx, meta); err != nil { util.CheckError(err, fmt.Sprintf("ProcessBigQueryTables().Create().Dataset(%s).Create() failed", datasetID)) } log.Printf("%s: Dataset created", datasetID) } datasetChecked = true } tableRef := client.Dataset(t.Criteria.DatasetID).Table(tableID) bigQueryTableSchema, err := bigquery.SchemaFromJSON([]byte(tableSchema)) util.CheckError(err, "bigquery.SchemaFromJSON() failed") metaData := &bigquery.TableMetadata{ Schema: bigQueryTableSchema, ExpirationTime: time.Now().AddDate(100, 0, 0), // Table will be automatically deleted in 100 years. } if t.Parameters.CfgParser != nil { cfgMap := t.Parameters.CfgParser.ConfigMap metaData.TimePartitioning = &bigquery.TimePartitioning{ //Type: TimePartitioningType(cfgMap[tableID].TimePartitioningPeriod), Expiration: 0, Field: cfgMap[tableID].TimePartitioningField, RequirePartitionFilter: false, } if len(cfgMap[tableID].ClusteringFields) > 0 { metaData.Clustering = &bigquery.Clustering{ Fields: cfgMap[tableID].ClusteringFields, } } } switch mode { case executionmode.PushMode: err = tableRef.Create(ctx, metaData) util.CheckErrorAndReturn(err, "ProcessBigQueryTables().Create() failed") case executionmode.RestoreMode: bqHandler.LoadDataFromGCS(tableID, t.Criteria.DatasetID, t.Parameters.GcsHandler.GcsPath, bigQueryTableSchema) case executionmode.UpdateMode: _, missingColumns := bqHandler.FindMissingColumnsInSchema(tableID, t.Criteria.DatasetID, bigQueryTableSchema) //t.ShowMissingColumns("ProcessBigQueryTables", tableID, missingColumns) err := bqHandler.AddColumnsToTable(t.Criteria.DatasetID, tableRef, missingColumns) util.CheckError(err, "AddColumnsToTable() failed") case executionmode.PatchMode: err := t.Parameters.BqHandler.PatchBigQueryTable(t.Criteria.ProjectID, t.Criteria.DatasetID, bigQueryTableSchema, tableRef) util.CheckError(err, "PatchBigQueryTable() failed") } } log.Printf("ProcessBigQueryTables() completed") } // ExportTableToGCS is used to generate sharded CSV files in GCS // for a given table func (t *Trotter) ExportTableToGCS(table, gcsURI string) { bucketName := t.Parameters.GcsHandler.GcsBucket log.Printf("ExportTableToGcs(%s) executing", bucketName) client := t.Parameters.GcsHandler.Client bucket := client.Bucket(bucketName) _, err := bucket.Attrs(t.Parameters.Ctx) util.CheckError(err, "ExportTableToGCS().Bucket() failed") log.Printf("Generating %s\n", gcsURI) gcsRef := bigquery.NewGCSReference(gcsURI) gcsRef.FieldDelimiter = "," extractor := t.Parameters.BqHandler.Client.DatasetInProject(t.Criteria.ProjectID, t.Criteria.DatasetID).Table(table).ExtractorTo(gcsRef) extractor.DisableHeader = true extractor.Location = t.Parameters.Location job, err := extractor.Run(t.Parameters.Ctx) util.CheckError(err, "ExportTableToGCS().extractor.Run() failed") status, err := job.Wait(t.Parameters.Ctx) util.CheckError(err, "ExportTableToGCS().job.Wait() failed") util.CheckError(status.Err(), "ExportTableToGCS().job.Wait() failed") log.Printf("ExportTableToGcs(%s) completed", bucketName) } // BackupDataset is used to iterate through all the tables // within a BigQuery dataset and generate sharded CSV backup // schema files in a Google Cloud Storage bucket. // Please refer to // https://cloud.google.com/bigquery/docs/managing-table-schemas#go func (t *Trotter) BackupDataset() { log.Printf("BackupDataset() executing") datasetID := t.Criteria.DatasetID bqHandler := t.Parameters.BqHandler tables, err := bqHandler.GetBigQueryTables(datasetID) util.CheckError(err, "BackupDataset().GetBigQueryTables() failed.") for _, bqTable := range tables { gcsURI := fmt.Sprintf("%s/%s/%s-*.csv", t.Parameters.GcsHandler.GcsPath, bqTable.TableID, bqTable.TableID) t.ExportTableToGCS(bqTable.TableID, gcsURI) //t.Parameters.GcsHandler.GetObjects(gcsPrefix, "/") } log.Printf("BackupDataset() completed") } // ShowCriteria is a convenience method used to display // filter criteria and runtime parameters func (t *Trotter) ShowCriteria() { log.Printf("showCriteria() executing") log.Printf("ProjectID: %s\n", t.Criteria.ProjectID) log.Printf("DatasetID: %s\n", t.Criteria.DatasetID) log.Printf("CacheDir: %s\n", t.Parameters.CacheDirPath) log.Printf("SchemaDir: %s\n", t.Parameters.SchemaDirPath) log.Printf("showCriteria() completed") }