sources/postgres/infoschema.go (556 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package postgres import ( "context" "database/sql" "fmt" "math/bits" "reflect" "sort" "strconv" "strings" "time" "cloud.google.com/go/civil" sp "cloud.google.com/go/spanner" _ "github.com/lib/pq" // we will use database/sql package instead of using this package directly "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" "github.com/GoogleCloudPlatform/spanner-migration-tool/streaming" ) // InfoSchemaImpl postgres specific implementation for InfoSchema. type InfoSchemaImpl struct { Db *sql.DB MigrationProjectId string SourceProfile profiles.SourceProfile TargetProfile profiles.TargetProfile IsSchemaUnique *bool } func (isi InfoSchemaImpl) populateSchemaIsUnique(schemaAndNames []common.SchemaAndName) { schemaSet := make(map[string]struct{}) for _, table := range schemaAndNames { schemaSet[table.Schema] = struct{}{} } if len(schemaSet) == 1 { *isi.IsSchemaUnique = true } else { *isi.IsSchemaUnique = false } } // StartChangeDataCapture is used for automatic triggering of Datastream job when // performing a streaming migration. func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) { mp := make(map[string]interface{}) var ( schemaDetails map[string]internal.SchemaDetails err error ) commonInfoSchema := common.InfoSchemaImpl{} schemaDetails, err = commonInfoSchema.GetIncludedSrcTablesFromConv(conv) if err != nil { err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err) } streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, schemaDetails) if err != nil { return nil, fmt.Errorf("error reading streaming config: %v", err) } pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.PubsubCfg = *pubsubCfg dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname, constants.DLQ_GCS) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.DlqPubsubCfg = *dlqPubsubCfg streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err } mp["streamingCfg"] = streamingCfg return mp, err } // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv) if err != nil { err = fmt.Errorf("error starting dataflow: %v", err) return internal.DataflowOutput{}, err } return dfOutput, nil } // GetToDdl function below implement the common.InfoSchema interface. func (isi InfoSchemaImpl) GetToDdl() common.ToDdl { return ToDdlImpl{} } // GetTableName returns table name. func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string { if *isi.IsSchemaUnique { // Drop schema name as prefix if only one schema is detected. return tableName } else if schema == "public" { return tableName } return fmt.Sprintf("%s.%s", schema, tableName) } // GetRowsFromTable returns a sql Rows object for a table. func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, tableId string) (interface{}, error) { // PostgreSQL schema and name can be arbitrary strings. // Ideally we would pass schema/name as a query parameter, // but PostgreSQL doesn't support this. So we quote it instead. isSchemaNamePrefixed := strings.HasPrefix(conv.SrcSchema[tableId].Name, conv.SrcSchema[tableId].Schema+".") var tableName string if isSchemaNamePrefixed { tableName = strings.TrimPrefix(conv.SrcSchema[tableId].Name, conv.SrcSchema[tableId].Schema+".") } else { tableName = conv.SrcSchema[tableId].Name } q := fmt.Sprintf(`SELECT * FROM "%s"."%s";`, conv.SrcSchema[tableId].Schema, tableName) rows, err := isi.Db.Query(q) if err != nil { return nil, err } return rows, err } // ProcessDataRows performs data conversion for source database // 'db'. For each table, we extract data using a "SELECT *" query, // convert the data to Spanner data (based on the source and Spanner // schemas), and write it to Spanner. If we can't get/process data // for a table, we skip that table and process the remaining tables. // // Note that the database/sql library has a somewhat complex model for // returning data from rows.Scan. Scalar values can be returned using // the native value used by the underlying driver (by passing // *interface{} to rows.Scan), or they can be converted to specific go // types. Array values are always returned as []byte, a string // encoding of the array values. This string encoding is // database/driver specific. For example, for PostgreSQL, array values // are returned in the form "{v1,v2,..,vn}", where each v1,v2,...,vn // is a PostgreSQL encoding of the respective array value. // // We choose to do all type conversions explicitly ourselves so that // we can generate more targeted error messages: hence we pass // *interface{} parameters to row.Scan. func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error { srcTableName := conv.SrcSchema[tableId].Name rowsInterface, err := isi.GetRowsFromTable(conv, tableId) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't get data for table %s : err = %s", srcTableName, err)) return err } rows := rowsInterface.(*sql.Rows) defer rows.Close() srcCols, _ := rows.Columns() v, iv := buildVals(len(srcCols)) colNameIdMap := internal.GetSrcColNameIdMap(conv.SrcSchema[tableId]) for rows.Next() { err := rows.Scan(iv...) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't process sql data row: %s", err)) // Scan failed, so we don't have any data to add to bad rows. conv.StatsAddBadRow(srcTableName, conv.DataMode()) continue } newValues, err1 := common.PrepareValues(conv, tableId, colNameIdMap, colIds, srcCols, v) cvtCols, cvtVals, err2 := convertSQLRow(conv, tableId, colIds, srcSchema, spSchema, newValues) if err1 != nil || err2 != nil { conv.Unexpected(fmt.Sprintf("Couldn't process sql data row: %s", err)) conv.StatsAddBadRow(srcTableName, conv.DataMode()) conv.CollectBadRow(srcTableName, srcCols, valsToStrings(v)) continue } conv.WriteRow(srcTableName, conv.SpSchema[tableId].Name, cvtCols, cvtVals) } return nil } // ConvertSQLRow performs data conversion for a single row of data // returned from a 'SELECT *' query. ConvertSQLRow assumes that // srcCols, spCols and srcVals all have the same length. Note that // ConvertSQLRow returns cols as well as converted values. This is // because cols can change when we add a column (synthetic primary // key) or because we drop columns (handling of NULL values). func convertSQLRow(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, srcVals []interface{}) ([]string, []interface{}, error) { var vs []interface{} var cs []string for i, colId := range colIds { srcCd, ok1 := srcSchema.ColDefs[colId] spCd, ok2 := spSchema.ColDefs[colId] if !ok1 || !ok2 { return nil, nil, fmt.Errorf("data conversion: can't find schema for column id %s of table %s", colId, conv.SrcSchema[tableId].Name) } if srcVals[i] == nil { continue // Skip NULL values (nil is used by database/sql to represent NULL values). } var spVal interface{} var err error if spCd.T.IsArray { spVal, err = cvtSQLArray(conv, srcCd, spCd, srcVals[i]) } else { spVal, err = cvtSQLScalar(conv, srcCd, spCd, srcVals[i]) } if err != nil { // Skip entire row if we hit error. return nil, nil, fmt.Errorf("can't convert sql data for column id %s of table %s: %w", colIds, conv.SrcSchema[tableId].Name, err) } vs = append(vs, spVal) cs = append(cs, spCd.Name) } if aux, ok := conv.SyntheticPKeys[tableId]; ok { cs = append(cs, conv.SpSchema[tableId].ColDefs[aux.ColId].Name) vs = append(vs, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[tableId] = aux } return cs, vs, nil } // GetRowCount with number of rows in each table. func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error) { // PostgreSQL schema and name can be arbitrary strings. // Ideally we would pass schema/name as a query parameter, // but PostgreSQL doesn't support this. So we quote it instead. q := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s";`, table.Schema, table.Name) rows, err := isi.Db.Query(q) if err != nil { return 0, err } defer rows.Close() var count int64 if rows.Next() { err := rows.Scan(&count) return count, err } return 0, nil //Check if 0 is ok to return } // GetTables return list of tables in the selected database. // TODO: All of the queries to get tables and table data should be in // a single transaction to ensure we obtain a consistent snapshot of // schema information and table data (pg_dump does something // similar). func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error) { ignored := make(map[string]bool) // Ignore all system tables: we just want to convert user tables. for _, s := range []string{"information_schema", "postgres", "pg_catalog", "pg_temp_1", "pg_toast", "pg_toast_temp_1"} { ignored[s] = true } q := "SELECT table_schema, table_name FROM information_schema.tables where table_type = 'BASE TABLE'" rows, err := isi.Db.Query(q) if err != nil { return nil, fmt.Errorf("couldn't get tables: %w", err) } defer rows.Close() var tableSchema, tableName string var tables []common.SchemaAndName for rows.Next() { rows.Scan(&tableSchema, &tableName) if !ignored[tableSchema] { tables = append(tables, common.SchemaAndName{Schema: tableSchema, Name: tableName}) } } isi.populateSchemaIsUnique(tables) return tables, nil } // GetColumns returns a list of Column objects and names func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error) { q := `SELECT c.column_name, c.data_type, e.data_type, c.is_nullable, c.column_default, c.character_maximum_length, c.numeric_precision, c.numeric_scale FROM information_schema.COLUMNS c LEFT JOIN information_schema.element_types e ON ((c.table_catalog, c.table_schema, c.table_name, 'TABLE', c.dtd_identifier) = (e.object_catalog, e.object_schema, e.object_name, e.object_type, e.collection_type_identifier)) where table_schema = $1 and table_name = $2 ORDER BY c.ordinal_position;` cols, err := isi.Db.Query(q, table.Schema, table.Name) if err != nil { return nil, nil, fmt.Errorf("couldn't get schema for table %s.%s: %s", table.Schema, table.Name, err) } defer cols.Close() colDefs := make(map[string]schema.Column) var colIds []string var colName, dataType, isNullable string var colDefault, elementDataType sql.NullString var charMaxLen, numericPrecision, numericScale sql.NullInt64 for cols.Next() { err := cols.Scan(&colName, &dataType, &elementDataType, &isNullable, &colDefault, &charMaxLen, &numericPrecision, &numericScale) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } ignored := schema.Ignored{} for _, c := range constraints[colName] { // c can be UNIQUE, PRIMARY KEY, FOREIGN KEY, // or CHECK (based on msql, sql server, postgres docs). // We've already filtered out PRIMARY KEY. switch c { case "CHECK": ignored.Check = true case "FOREIGN KEY", "PRIMARY KEY", "UNIQUE": // Nothing to do here -- these are handled elsewhere. } } ignored.Default = colDefault.Valid colId := internal.GenerateColumnId() c := schema.Column{ Id: colId, Name: colName, Type: toType(dataType, elementDataType, charMaxLen, numericPrecision, numericScale), NotNull: common.ToNotNull(conv, isNullable), Ignored: ignored, } colDefs[colId] = c colIds = append(colIds, colId) } return colDefs, colIds, nil } // GetConstraints returns a list of primary keys and by-column map of // other constraints. Note: we need to preserve ordinal order of // columns in primary key constraints. // Note that foreign key constraints are handled in getForeignKeys. func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error) { q := `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA WHERE k.TABLE_SCHEMA = $1 AND k.TABLE_NAME = $2 ORDER BY k.ordinal_position;` rows, err := isi.Db.Query(q, table.Schema, table.Name) if err != nil { return nil, nil, nil, err } defer rows.Close() var primaryKeys []string var col, constraint string m := make(map[string][]string) for rows.Next() { err := rows.Scan(&col, &constraint) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } if col == "" || constraint == "" { conv.Unexpected(fmt.Sprintf("Got empty col or constraint")) continue } switch constraint { case "PRIMARY KEY": primaryKeys = append(primaryKeys, col) default: m[col] = append(m[col], constraint) } } return primaryKeys, nil, m, nil } // GetForeignKeys returns a list of all the foreign key constraints. func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) { q := `SELECT rc.constraint_schema AS "TABLE_SCHEMA", ccu.table_name AS "REFERENCED_TABLE_NAME", kcu.column_name AS "COLUMN_NAME", ccu.column_name AS "REF_COLUMN_NAME", rc.constraint_name AS "CONSTRAINT_NAME", rc.delete_rule AS "ON_DELETE", rc.update_rule AS "ON_UPDATE" FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu ON rc.constraint_name = kcu.constraint_name AND rc.constraint_schema = kcu.constraint_schema INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu ON rc.constraint_name = ccu.constraint_name AND rc.constraint_schema = ccu.constraint_schema WHERE rc.constraint_schema = $1 AND kcu.table_name = $2;` rows, err := isi.Db.Query(q, table.Schema, table.Name) if err != nil { return nil, err } defer rows.Close() var refTable common.SchemaAndName var col, refCol, fKeyName, onDelete, onUpdate string fKeys := make(map[string]common.FkConstraint) var keyNames []string for rows.Next() { err := rows.Scan(&refTable.Schema, &refTable.Name, &col, &refCol, &fKeyName, &onDelete, &onUpdate) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } tableName := isi.GetTableName(refTable.Schema, refTable.Name) if _, found := fKeys[fKeyName]; found { fk := fKeys[fKeyName] fk.Cols = append(fk.Cols, col) fk.Refcols = append(fk.Refcols, refCol) fKeys[fKeyName] = fk fk.OnDelete = onDelete fk.OnUpdate = onUpdate continue } fKeys[fKeyName] = common.FkConstraint{Name: fKeyName, Table: tableName, Refcols: []string{refCol}, Cols: []string{col}, OnDelete: onDelete, OnUpdate: onUpdate} keyNames = append(keyNames, fKeyName) } sort.Strings(keyNames) for _, k := range keyNames { foreignKeys = append(foreignKeys, schema.ForeignKey{ Id: internal.GenerateForeignkeyId(), Name: fKeys[k].Name, ColumnNames: fKeys[k].Cols, ReferTableName: fKeys[k].Table, ReferColumnNames: fKeys[k].Refcols, OnDelete: fKeys[k].OnDelete, OnUpdate: fKeys[k].OnUpdate, }) } return foreignKeys, nil } // GetIndexes return a list of all indexes for the specified table. // Note: Extracting index definitions from PostgreSQL information schema tables is complex. // See https://stackoverflow.com/questions/6777456/list-all-index-names-column-names-and-its-table-name-of-a-postgresql-database/44460269#44460269 // for background. func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) ([]schema.Index, error) { q := `SELECT irel.relname AS index_name, a.attname AS column_name, 1 + Array_position(i.indkey, a.attnum) AS column_position, i.indisunique AS is_unique, CASE o.OPTION & 1 WHEN 1 THEN 'DESC' ELSE 'ASC' END AS order FROM pg_index AS i JOIN pg_class AS trel ON trel.oid = i.indrelid JOIN pg_namespace AS tnsp ON trel.relnamespace = tnsp.oid JOIN pg_class AS irel ON irel.oid = i.indexrelid CROSS JOIN LATERAL UNNEST (i.indkey) WITH ordinality AS c (colnum, ordinality) LEFT JOIN LATERAL UNNEST (i.indoption) WITH ordinality AS o (OPTION, ordinality) ON c.ordinality = o.ordinality JOIN pg_attribute AS a ON trel.oid = a.attrelid AND a.attnum = c.colnum WHERE tnsp.nspname= $1 AND trel.relname= $2 AND i.indisprimary = false GROUP BY tnsp.nspname, trel.relname, irel.relname, a.attname, array_position(i.indkey, a.attnum), o.OPTION,i.indisunique ORDER BY irel.relname, array_position(i.indkey, a.attnum);` rows, err := isi.Db.Query(q, table.Schema, table.Name) if err != nil { return nil, err } defer rows.Close() var name, column, sequence, isUnique, collation string indexMap := make(map[string]schema.Index) var indexNames []string var indexes []schema.Index for rows.Next() { if err := rows.Scan(&name, &column, &sequence, &isUnique, &collation); err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } if _, found := indexMap[name]; !found { indexNames = append(indexNames, name) indexMap[name] = schema.Index{ Id: internal.GenerateIndexesId(), Name: name, Unique: (isUnique == "true")} } index := indexMap[name] index.Keys = append(index.Keys, schema.Key{ ColId: colNameIdMap[column], Desc: (collation == "DESC")}) indexMap[name] = index } for _, k := range indexNames { indexes = append(indexes, indexMap[k]) } return indexes, nil } func toType(dataType string, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type { switch { case dataType == "ARRAY" && elementDataType.Valid: return schema.Type{Name: elementDataType.String, ArrayBounds: []int64{-1}} // TODO: handle error cases. // TODO: handle case of multiple array bounds. case charLen.Valid: return schema.Type{Name: dataType, Mods: []int64{charLen.Int64}} case numericPrecision.Valid && numericScale.Valid && numericScale.Int64 != 0: return schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64, numericScale.Int64}} case numericPrecision.Valid: return schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64}} default: return schema.Type{Name: dataType} } } func cvtSQLArray(conv *internal.Conv, srcCd schema.Column, spCd ddl.ColumnDef, val interface{}) (interface{}, error) { a, ok := val.([]byte) if !ok { return nil, fmt.Errorf("can't convert array values to []byte") } return convArray(spCd.T, srcCd.Type.Name, conv.Location, string(a)) } // cvtSQLScalar converts a values returned from a SQL query to a // Spanner value. In principle, we could just hand the values we get // from the driver over to Spanner and have the Spanner client handle // conversions and errors. However we handle the conversions // explicitly ourselves so that we can generate more targeted error // messages. Note that the caller is responsible for handling nil // values (used to represent NULL). We handle each of the remaining // cases of values returned by the database/sql library: // // bool // []byte // int64 // float32 // float64 // string // time.Time func cvtSQLScalar(conv *internal.Conv, srcCd schema.Column, spCd ddl.ColumnDef, val interface{}) (interface{}, error) { switch spCd.T.Name { case ddl.Bool: switch v := val.(type) { case bool: return v, nil case string: return convBool(v) } case ddl.Bytes: switch v := val.(type) { case []byte: return v, nil } case ddl.Date: // The PostgreSQL driver uses time.Time to represent // dates. Note that the database/sql library doesn't // document how dates are represented, so maybe this // isn't a driver issue, but a generic database/sql // issue. We explicitly convert from time.Time to // civil.Date (used by the Spanner client library). switch v := val.(type) { case string: return convDate(v) case time.Time: return civil.DateOf(v), nil } case ddl.Int64: switch v := val.(type) { case []byte: // Parse as int64. return convInt64(string(v)) case int64: return v, nil case float32: // Truncate. return int64(v), nil case float64: // Truncate. return int64(v), nil case string: // Parse as int64. return convInt64(v) } case ddl.Float32: switch v := val.(type) { case []byte: // Note: PostgreSQL uses []byte for numeric. return convFloat32(string(v)) case int64: return float32(v), nil case float32: return v, nil case float64: return float32(v), nil case string: return convFloat32(v) } case ddl.Float64: switch v := val.(type) { case []byte: // Note: PostgreSQL uses []byte for numeric. return convFloat64(string(v)) case int64: return float64(v), nil case float32: return float64(v), nil case float64: return v, nil case string: return convFloat64(v) } case ddl.Numeric: switch v := val.(type) { case []byte: // Note: PostgreSQL uses []byte for numeric. return convNumeric(conv, string(v)) } case ddl.String: switch v := val.(type) { case bool: return strconv.FormatBool(v), nil case []byte: return string(v), nil case int64: return strconv.FormatInt(v, 10), nil case float32: return strconv.FormatFloat(float64(v), 'g', -1, 32), nil case float64: return strconv.FormatFloat(v, 'g', -1, 64), nil case string: return v, nil case time.Time: return v.String(), nil } case ddl.Timestamp: switch v := val.(type) { case string: return convTimestamp(srcCd.Type.Name, conv.Location, v) case time.Time: return v, nil } case ddl.JSON: switch v := val.(type) { case string: return string(v), nil case []uint8: return string(v), nil } } return nil, fmt.Errorf("can't convert value of type %s to Spanner type %s", reflect.TypeOf(val), reflect.TypeOf(spCd.T)) } // buildVals contructs interface{} value containers to scan row // results into. Returns both the underlying containers (as a slice) // as well as an interface{} of pointers to containers to pass to // rows.Scan. func buildVals(n int) (v []interface{}, iv []interface{}) { v = make([]interface{}, n) for i := range v { iv = append(iv, &v[i]) } return v, iv } func valsToStrings(vals []interface{}) []string { toString := func(val interface{}) string { if val == nil { return "NULL" } switch v := val.(type) { case *interface{}: val = *v } return fmt.Sprintf("%v", val) } var s []string for _, v := range vals { s = append(s, toString(v)) } return s }