in go/adbc/driver/snowflake/connection.go [413:590]
func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result internal.SchemaToTableInfo, err error) {
if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas {
return
}
result = make(internal.SchemaToTableInfo)
includeSchema := depth == adbc.ObjectDepthAll || depth == adbc.ObjectDepthColumns
conditions := make([]string, 0)
if catalog != nil && *catalog != "" {
conditions = append(conditions, ` TABLE_CATALOG ILIKE \'`+*catalog+`\'`)
}
if dbSchema != nil && *dbSchema != "" {
conditions = append(conditions, ` TABLE_SCHEMA ILIKE \'`+*dbSchema+`\'`)
}
if tableName != nil && *tableName != "" {
conditions = append(conditions, ` TABLE_NAME ILIKE \'`+*tableName+`\'`)
}
const queryPrefix = `DECLARE
c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES;
res RESULTSET;
counter INTEGER DEFAULT 0;
statement VARCHAR DEFAULT '';
BEGIN
FOR rec IN c1 DO
LET sharelist RESULTSET := (EXECUTE IMMEDIATE 'SHOW SHARES LIKE \'%' || rec.database_name || '%\'');
LET cnt RESULTSET := (SELECT COUNT(*) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())));
LET cnt_cur CURSOR for cnt;
LET share_cnt INTEGER DEFAULT 0;
OPEN cnt_cur;
FETCH cnt_cur INTO share_cnt;
CLOSE cnt_cur;
IF (share_cnt > 0) THEN
LET c2 CURSOR for sharelist;
LET created_on TIMESTAMP;
LET kind VARCHAR DEFAULT '';
LET share_name VARCHAR DEFAULT '';
LET dbname VARCHAR DEFAULT '';
OPEN c2;
FETCH c2 INTO created_on, kind, share_name, dbname;
CLOSE c2;
IF (dbname = '') THEN
CONTINUE;
END IF;
END IF;
IF (counter > 0) THEN
statement := statement || ' UNION ALL ';
END IF;
`
const noSchema = `statement := statement || ' SELECT table_catalog, table_schema, table_name, table_type FROM ' || rec.database_name || '.INFORMATION_SCHEMA.TABLES';
counter := counter + 1;
END FOR;
`
const getSchema = `statement := statement ||
' SELECT
table_catalog, table_schema, table_name, column_name,
ordinal_position, is_nullable::boolean, data_type, numeric_precision,
numeric_precision_radix, numeric_scale, is_identity::boolean,
identity_generation, identity_increment, comment
FROM ' || rec.database_name || '.INFORMATION_SCHEMA.COLUMNS';
counter := counter + 1;
END FOR;
`
const querySuffix = `
res := (EXECUTE IMMEDIATE :statement);
RETURN TABLE (res);
END;`
// first populate the tables and table types
var rows *sql.Rows
var tblConditions []string
if len(tableType) > 0 {
tblConditions = append(conditions, ` TABLE_TYPE IN (\'`+strings.Join(tableType, `\',\'`)+`\')`)
} else {
tblConditions = conditions
}
cond := strings.Join(tblConditions, " AND ")
if cond != "" {
cond = `statement := 'SELECT * FROM (' || statement || ') WHERE ` + cond + `';`
}
query := queryPrefix + noSchema + cond + querySuffix
rows, err = c.sqldb.QueryContext(ctx, query)
if err != nil {
err = errToAdbcErr(adbc.StatusIO, err)
return
}
defer rows.Close()
var tblCat, tblSchema, tblName string
var tblType sql.NullString
for rows.Next() {
if err = rows.Scan(&tblCat, &tblSchema, &tblName, &tblType); err != nil {
err = errToAdbcErr(adbc.StatusIO, err)
return
}
key := internal.CatalogAndSchema{
Catalog: tblCat, Schema: tblSchema}
result[key] = append(result[key], internal.TableInfo{
Name: tblName, TableType: tblType.String})
}
if includeSchema {
// if we need to include the schemas of the tables, make another fetch
// to fetch the columns and column info
if columnName != nil && *columnName != "" {
conditions = append(conditions, ` column_name ILIKE \'`+*columnName+`\'`)
}
cond = strings.Join(conditions, " AND ")
if cond != "" {
cond = " WHERE " + cond
}
cond = `statement := 'SELECT * FROM (' || statement || ')` + cond +
` ORDER BY table_catalog, table_schema, table_name, ordinal_position';`
query = queryPrefix + getSchema + cond + querySuffix
rows, err = c.sqldb.QueryContext(ctx, query)
if err != nil {
return
}
defer rows.Close()
var (
colName, dataType string
identGen, identIncrement, comment sql.NullString
ordinalPos int
numericPrec, numericPrecRadix, numericScale sql.NullInt16
isNullable, isIdent bool
prevKey internal.CatalogAndSchema
curTableInfo *internal.TableInfo
fieldList = make([]arrow.Field, 0)
)
for rows.Next() {
// order here matches the order of the columns requested in the query
err = rows.Scan(&tblCat, &tblSchema, &tblName, &colName,
&ordinalPos, &isNullable, &dataType, &numericPrec,
&numericPrecRadix, &numericScale, &isIdent, &identGen,
&identIncrement, &comment)
if err != nil {
err = errToAdbcErr(adbc.StatusIO, err)
return
}
key := internal.CatalogAndSchema{Catalog: tblCat, Schema: tblSchema}
if prevKey != key || (curTableInfo != nil && curTableInfo.Name != tblName) {
if len(fieldList) > 0 && curTableInfo != nil {
curTableInfo.Schema = arrow.NewSchema(fieldList, nil)
fieldList = fieldList[:0]
}
info := result[key]
for i := range info {
if info[i].Name == tblName {
curTableInfo = &info[i]
break
}
}
}
prevKey = key
fieldList = append(fieldList, toField(colName, isNullable, dataType, numericPrec, numericPrecRadix, numericScale, isIdent, identGen, identIncrement, comment, ordinalPos))
}
if len(fieldList) > 0 && curTableInfo != nil {
curTableInfo.Schema = arrow.NewSchema(fieldList, nil)
}
}
return
}