tools/bqman/golang/connector/sqlserverhandler.go (238 lines of code) (raw):
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package connector
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
bqhandler "github.com/GoogleCloudPlatform/bqman/bqhandler"
util "github.com/GoogleCloudPlatform/bqman/util"
// The go-mssqldb driver is used to connect to SQL Server
_ "github.com/denisenkom/go-mssqldb"
)
// https://docs.microsoft.com/en-us/azure/azure-sql/database/connect-query-go
// SQLServerHandler is used to hold connection information to SQL Server
type SQLServerHandler struct {
Ctx context.Context
Db *sql.DB
Server string
Port int
User string
Password string
Database string
}
// SQLServerColumnInfo is used to hold database table colum info
type SQLServerColumnInfo struct {
ColumnName sql.NullString
ColumnDescription sql.NullString
OrdinalPosition sql.NullInt32
IsNullable sql.NullString
Datatype sql.NullString
NumericPrecision sql.NullInt32
NumericPrecisionRadix sql.NullInt32
NumericScale sql.NullInt32
IsPrimaryKey sql.NullInt32
}
// SQLServerBigQueryColumnMap is used to hold mapping
// between SQL Server and BigQuery datatypes
var SQLServerBigQueryColumnMap map[string]string
// NewSQLServerHandler is used to open a database connection
// It returns a pointer to NewSQLServerHandler
func NewSQLServerHandler(server, user, password, database string, port int) *SQLServerHandler {
log.Printf("NewSQLServerHandler() executing")
connString := fmt.Sprintf("server=%s;user id=%s;password=%s;port=%d;database=%s;",
server, user, password, port, database)
db, err := sql.Open("sqlserver", connString)
util.CheckError(err, "NewSQLServerHandler().sql.Open() failed!")
ctx := context.Background()
handler := new(SQLServerHandler)
err = db.PingContext(ctx)
util.CheckError(err, "NewSQLServerHandler().db.PingContext() failed!")
handler.Ctx = ctx
handler.Db = db
handler.Server = server
handler.Port = port
handler.User = user
handler.Password = password
handler.Database = database
SQLServerBigQueryColumnMap = map[string]string{
"bigint identity": "INTEGER",
"bigint": "INTEGER",
"int": "INTEGER",
"nvarchar": "STRING",
"varchar": "STRING",
"datetime": "DATETIME",
"datetime2": "DATETIME",
"bit": "BOOLEAN",
"timestamp": "STRING",
"sysname": "STRING",
"char": "STRING",
"float": "FLOAT",
"decimal": "NUMERIC",
"smallint": "INTEGER",
"money": "NUMERIC",
"date": "DATE",
"tinyint": "INTEGER",
"Enumeration": "INTEGER",
"LongName": "STRING",
"boolean": "BOOLEAN",
"double": "FLOAT",
"xml": "STRING",
"varbinary": "STRING",
"uniqueidentifier": "STRING",
"nchar": "STRING",
"geography": "GEOGRAPHY",
"hierarchyid": "STRING",
"numeric": "NUMERIC",
"smallmoney": "NUMERIC",
"time": "TIME",
}
log.Printf("NewSQLServerHandler() completed")
return handler
}
// GetTableSchemas constructs and executes a prepared SQL query
// to extract a unique list of table schemas for a given
// SQL Server table catalog
func (ssh *SQLServerHandler) GetTableSchemas(tableCatalog string) []string {
log.Printf("GetTableSchemas() executing")
tsql := `SELECT DISTINCT TABLE_SCHEMA
FROM INFORMATION_SCHEMA.TABLES t
WHERE TABLE_CATALOG = @TableCatalog
AND TABLE_SCHEMA <> 'dbo'
ORDER BY TABLE_SCHEMA`
stmt, err := ssh.Db.PrepareContext(ssh.Ctx, tsql)
util.CheckError(err, "GetTableSchemas().db.PrepareContext() failed")
defer stmt.Close()
rows, err := stmt.QueryContext(ssh.Ctx, sql.Named("TableCatalog", tableCatalog))
util.CheckError(err, "GetTableSchemas().stmt.QueryContext() failed")
defer rows.Close()
tableSchemas := make([]string, 0)
for rows.Next() {
var tableSchema string
err := rows.Scan(&tableSchema)
util.CheckError(err, "GetTableSchemas().rows.Scan() failed")
tableSchemas = append(tableSchemas, tableSchema)
}
log.Printf("GetTableSchemas() completed")
return tableSchemas
}
// GetTables constructs and executes a prepared SQL query to
// fetch a list of tables for a given combination of tableCatalog
// and tableSchema
func (ssh *SQLServerHandler) GetTables(tableCatalog, tableSchema string) []string {
log.Printf("GetColumns() executing")
tsql := `SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES t
WHERE TABLE_TYPE = 'BASE TABLE'
AND TABLE_CATALOG = @TableCatalog
AND TABLE_SCHEMA = @TableSchema
ORDER BY TABLE_NAME`
stmt, err := ssh.Db.PrepareContext(ssh.Ctx, tsql)
util.CheckError(err, "GetColumns().db.PrepareContext() failed")
defer stmt.Close()
rows, err := stmt.QueryContext(ssh.Ctx,
sql.Named("TableCatalog", tableCatalog),
sql.Named("TableSchema", tableSchema))
util.CheckError(err, "GetColumns().stmt.QueryContext() failed")
defer rows.Close()
tables := make([]string, 0)
for rows.Next() {
var tableSchema string
err := rows.Scan(&tableSchema)
util.CheckError(err, "GetColumns().rows.Scan() failed")
tables = append(tables, tableSchema)
}
log.Printf("GetColumns() completed")
return tables
}
// GetColumns constructs and executes a prepared SQL query to fetch
// the column info for a given SQL server table
func (ssh *SQLServerHandler) GetColumns(tableCatalog, tableSchema, tableName string) []SQLServerColumnInfo {
log.Printf("GetColumns() executing")
tsql := `
WITH primary_keys AS (
SELECT cc.TABLE_CATALOG, cc.TABLE_SCHEMA, cc.TABLE_NAME, cc.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE cc
ON cc.CONSTRAINT_CATALOG = tc.CONSTRAINT_CATALOG
AND cc.CONSTRAINT_SCHEMA = tc.CONSTRAINT_SCHEMA
AND cc.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY')
SELECT c.COLUMN_NAME,
prop.value AS COL_DESC,
c.ORDINAL_POSITION, c.IS_NULLABLE, c.DATA_TYPE, c.NUMERIC_PRECISION, c.NUMERIC_PRECISION_RADIX, c.NUMERIC_SCALE,
(CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END) AS IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN primary_keys pk
ON c.TABLE_CATALOG = pk.TABLE_CATALOG
AND c.TABLE_SCHEMA = pk.TABLE_SCHEMA
AND c.TABLE_NAME = pk.TABLE_NAME
AND c.COLUMN_NAME = pk.COLUMN_NAME
INNER JOIN sys.columns AS sc ON sc.object_id = object_id(c.table_schema + '.' + c.table_name)
AND sc.NAME = c.COLUMN_NAME
LEFT JOIN sys.extended_properties prop ON prop.major_id = sc.object_id
AND prop.minor_id = sc.column_id
AND prop.NAME = 'MS_Description'
AND prop.class_desc = 'OBJECT_OR_COLUMN'
WHERE c.TABLE_CATALOG = @TableCatalog
AND c.TABLE_SCHEMA = @TableSchema
AND c.TABLE_NAME = @TableName
ORDER BY c.TABLE_CATALOG, c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION, IS_PRIMARY_KEY`
stmt, err := ssh.Db.PrepareContext(ssh.Ctx, tsql)
util.CheckError(err, "GetColumns().db.PrepareContext() failed")
defer stmt.Close()
rows, err := stmt.QueryContext(ssh.Ctx,
sql.Named("TableCatalog", tableCatalog),
sql.Named("TableSchema", tableSchema),
sql.Named("TableName", tableName))
util.CheckError(err, "GetColumns().stmt.QueryContext() failed")
defer rows.Close()
columns := make([]SQLServerColumnInfo, 0)
for rows.Next() {
columnInfo := new(SQLServerColumnInfo)
err := rows.Scan(
&columnInfo.ColumnName,
&columnInfo.ColumnDescription,
&columnInfo.OrdinalPosition,
&columnInfo.IsNullable,
&columnInfo.Datatype,
&columnInfo.NumericPrecision,
&columnInfo.NumericPrecisionRadix,
&columnInfo.NumericScale,
&columnInfo.IsPrimaryKey,
)
util.CheckError(err, "GetColumns().rows.Scan() failed")
columns = append(columns, *columnInfo)
}
log.Printf("GetColumns() completed")
return columns
}
// ReadDatabaseSchema fetches the table schema from SQL Server
func (ssh *SQLServerHandler) ReadDatabaseSchema() {
log.Printf("ReadDatabaseSchema() executing")
sql := "select table_schema, table_name, table_type from information_schema.tables"
rows, err := ssh.Db.QueryContext(ssh.Ctx, sql)
defer rows.Close()
util.CheckError(err, "ReadDatabaseSchema().db.QueryContext() failed")
var count int
for rows.Next() {
var tableSchema, tableName, tableType string
err := rows.Scan(&tableSchema, &tableName, &tableType)
util.CheckError(err, "ReadDatabaseSchema().rows.Scan() failed")
fmt.Printf("table_schema: %s; table_name: %s; table_type: %s\n", tableSchema, tableName, tableType)
count++
}
fmt.Printf("Record Count: %d\n", count)
log.Printf("ReadDatabaseSchema() completed")
}
// ConvertColumnInfoToBqSchema accepts an slice of SQLServerColumnInfo
// and returns a slice of BqSchema (BigQuery JSON schema)
func (ssh *SQLServerHandler) ConvertColumnInfoToBqSchema(columnInfos []SQLServerColumnInfo) []bqhandler.BqSchema {
log.Printf("ConvertColumnInfoToBqSchema() executing")
bqSchemas := make([]bqhandler.BqSchema, 0)
for _, ci := range columnInfos {
bqSchema := &bqhandler.BqSchema{
Name: ci.ColumnName.String,
Description: ci.ColumnDescription.String,
Type: SQLServerBigQueryColumnMap[ci.Datatype.String],
}
if ci.IsNullable.String == "NO" || ci.IsPrimaryKey.Int32 == 1 {
bqSchema.Mode = "Required"
} else {
bqSchema.Mode = "Nullable"
}
bqSchemas = append(bqSchemas, *bqSchema)
}
log.Printf("ConvertColumnInfoToBqSchema() completed")
return bqSchemas
}
// ConvertToJSON converts a slice of BqSchema objects to a JSON byte array
func (ssh *SQLServerHandler) ConvertToJSON(records []bqhandler.BqSchema) []byte {
log.Printf("ConvertToJSON() executing")
bytes, err := json.Marshal(records)
if err != nil {
log.Fatalf("ConvertToJSON(): json.Marshal() failed")
}
log.Printf("ConvertToJSON() completed")
return bytes
}