in go/adbc/driver/snowflake/connection.go [172:368]
func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog, dbSchema, tableName, columnName *string, tableType []string) (rdr array.RecordReader, err error) {
var (
pkQueryID, fkQueryID, uniqueQueryID, terseDbQueryID string
showSchemaQueryID, tableQueryID string
)
conn := c.cn
var hasViews, hasTables bool
for _, t := range tableType {
if strings.EqualFold("VIEW", t) {
hasViews = true
} else if strings.EqualFold("TABLE", t) {
hasTables = true
}
}
// force empty result from SHOW TABLES if tableType list is not empty
// and does not contain TABLE or VIEW in the list.
// we need this because we should have non-null db_schema_tables when
// depth is Tables, Columns or All.
var badTableType = "tabletypedoesnotexist"
if len(tableType) > 0 && depth >= adbc.ObjectDepthTables && !hasViews && !hasTables {
tableName = &badTableType
tableType = []string{"TABLE"}
}
gQueryIDs, gQueryIDsCtx := errgroup.WithContext(ctx)
queryFile := queryTemplateGetObjectsAll
switch depth {
case adbc.ObjectDepthCatalogs:
queryFile = queryTemplateGetObjectsTerseCatalogs
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
catalog, dbSchema, tableName, &terseDbQueryID)
case adbc.ObjectDepthDBSchemas:
queryFile = queryTemplateGetObjectsDbSchemas
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
catalog, dbSchema, tableName, &showSchemaQueryID)
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
catalog, dbSchema, tableName, &terseDbQueryID)
case adbc.ObjectDepthTables:
queryFile = queryTemplateGetObjectsTables
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
catalog, dbSchema, tableName, &showSchemaQueryID)
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
catalog, dbSchema, tableName, &terseDbQueryID)
objType := objObjects
if len(tableType) == 1 {
if strings.EqualFold("VIEW", tableType[0]) {
objType = objViews
} else if strings.EqualFold("TABLE", tableType[0]) {
objType = objTables
}
}
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objType,
catalog, dbSchema, tableName, &tableQueryID)
default:
var suffix string
if catalog == nil || isWildcardStr(*catalog) {
suffix = " IN ACCOUNT"
} else {
escapedCatalog := quoteTblName(*catalog)
if dbSchema == nil || isWildcardStr(*dbSchema) {
suffix = " IN DATABASE " + escapedCatalog
} else {
escapedSchema := quoteTblName(*dbSchema)
if tableName == nil || isWildcardStr(*tableName) {
suffix = " IN SCHEMA " + escapedCatalog + "." + escapedSchema
} else {
escapedTable := quoteTblName(*tableName)
suffix = " IN TABLE " + escapedCatalog + "." + escapedSchema + "." + escapedTable
}
}
}
// Detailed constraint info not available in information_schema
// Need to dispatch SHOW queries and use conn.Raw to extract the queryID for reuse in GetObjects query
gQueryIDs.Go(func() (err error) {
pkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW PRIMARY KEYS /* ADBC:getObjectsTables */"+suffix, conn)
return err
})
gQueryIDs.Go(func() (err error) {
fkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW IMPORTED KEYS /* ADBC:getObjectsTables */"+suffix, conn)
return err
})
gQueryIDs.Go(func() (err error) {
uniqueQueryID, err = getQueryID(gQueryIDsCtx, "SHOW UNIQUE KEYS /* ADBC:getObjectsTables */"+suffix, conn)
return err
})
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
catalog, dbSchema, tableName, &terseDbQueryID)
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
catalog, dbSchema, tableName, &showSchemaQueryID)
objType := objObjects
if len(tableType) == 1 {
if strings.EqualFold("VIEW", tableType[0]) {
objType = objViews
} else if strings.EqualFold("TABLE", tableType[0]) {
objType = objTables
}
}
goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objType,
catalog, dbSchema, tableName, &tableQueryID)
}
queryBytes, err := fs.ReadFile(queryTemplates, path.Join("queries", queryFile))
if err != nil {
return nil, err
}
// Need constraint subqueries to complete before we can query GetObjects
if err := gQueryIDs.Wait(); err != nil {
return nil, err
}
args := []sql.NamedArg{
// Optional filter patterns
driverbase.PatternToNamedArg("CATALOG", catalog),
driverbase.PatternToNamedArg("DB_SCHEMA", dbSchema),
driverbase.PatternToNamedArg("TABLE", tableName),
driverbase.PatternToNamedArg("COLUMN", columnName),
// QueryIDs for constraint data if depth is tables or deeper
// or if the depth is catalog and catalog is null
sql.Named("PK_QUERY_ID", pkQueryID),
sql.Named("FK_QUERY_ID", fkQueryID),
sql.Named("UNIQUE_QUERY_ID", uniqueQueryID),
sql.Named("SHOW_DB_QUERY_ID", terseDbQueryID),
sql.Named("SHOW_SCHEMA_QUERY_ID", showSchemaQueryID),
sql.Named("SHOW_TABLE_QUERY_ID", tableQueryID),
}
nvargs := make([]driver.NamedValue, len(args))
for i, arg := range args {
nvargs[i] = driver.NamedValue{
Name: arg.Name,
Ordinal: i + 1,
Value: arg.Value,
}
}
query := string(queryBytes)
rows, err := conn.QueryContext(ctx, query, nvargs)
if err != nil {
return nil, errToAdbcErr(adbc.StatusIO, err)
}
defer func() {
err = errors.Join(err, rows.Close())
}()
catalogCh := make(chan driverbase.GetObjectsInfo, runtime.NumCPU())
errCh := make(chan error)
go func() {
defer close(catalogCh)
dest := make([]driver.Value, len(rows.Columns()))
for {
if err := rows.Next(dest); err != nil {
if errors.Is(err, io.EOF) {
return
}
errCh <- errToAdbcErr(adbc.StatusInvalidData, err)
return
}
var getObjectsCatalog driverbase.GetObjectsInfo
if err := getObjectsCatalog.Scan(dest[0]); err != nil {
errCh <- errToAdbcErr(adbc.StatusInvalidData, err)
return
}
// A few columns need additional processing outside of Snowflake
for i, sch := range getObjectsCatalog.CatalogDbSchemas {
for j, tab := range sch.DbSchemaTables {
for k, col := range tab.TableColumns {
field := c.toArrowField(col)
xdbcDataType := internal.ToXdbcDataType(field.Type)
if field.Type != nil {
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = driverbase.Nullable(int16(field.Type.ID()))
}
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = driverbase.Nullable(int16(xdbcDataType))
}
}
}
catalogCh <- getObjectsCatalog
}
}()
return driverbase.BuildGetObjectsRecordReader(c.Alloc, catalogCh, errCh)
}