func()

in go/adbc/driver/snowflake/connection.go [413:590]


func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result internal.SchemaToTableInfo, err error) {
	if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas {
		return
	}

	result = make(internal.SchemaToTableInfo)
	includeSchema := depth == adbc.ObjectDepthAll || depth == adbc.ObjectDepthColumns

	conditions := make([]string, 0)
	if catalog != nil && *catalog != "" {
		conditions = append(conditions, ` TABLE_CATALOG ILIKE \'`+*catalog+`\'`)
	}
	if dbSchema != nil && *dbSchema != "" {
		conditions = append(conditions, ` TABLE_SCHEMA ILIKE \'`+*dbSchema+`\'`)
	}
	if tableName != nil && *tableName != "" {
		conditions = append(conditions, ` TABLE_NAME ILIKE \'`+*tableName+`\'`)
	}

	const queryPrefix = `DECLARE
		c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES;
		res RESULTSET;
		counter INTEGER DEFAULT 0;
		statement VARCHAR DEFAULT '';
	BEGIN
		FOR rec IN c1 DO
			LET sharelist RESULTSET := (EXECUTE IMMEDIATE 'SHOW SHARES LIKE \'%' || rec.database_name || '%\'');
			LET cnt RESULTSET := (SELECT COUNT(*) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())));
			LET cnt_cur CURSOR for cnt;
			LET share_cnt INTEGER DEFAULT 0;
			OPEN cnt_cur;
			FETCH cnt_cur INTO share_cnt;
			CLOSE cnt_cur;

			IF (share_cnt > 0) THEN
				LET c2 CURSOR for sharelist;
				LET created_on TIMESTAMP;
				LET kind VARCHAR DEFAULT '';
				LET share_name VARCHAR DEFAULT '';
				LET dbname VARCHAR DEFAULT '';
				OPEN c2;
				FETCH c2 INTO created_on, kind, share_name, dbname;
				CLOSE c2;
				IF (dbname = '') THEN
					CONTINUE;
				END IF;
			END IF;
			IF (counter > 0) THEN
				statement := statement || ' UNION ALL ';
			END IF;
			`

	const noSchema = `statement := statement || ' SELECT table_catalog, table_schema, table_name, table_type FROM ' || rec.database_name || '.INFORMATION_SCHEMA.TABLES';
			counter := counter + 1;
		END FOR;
		`

	const getSchema = `statement := statement ||
		' SELECT
				table_catalog, table_schema, table_name, column_name,
				ordinal_position, is_nullable::boolean, data_type, numeric_precision,
				numeric_precision_radix, numeric_scale, is_identity::boolean,
				identity_generation, identity_increment, comment
		FROM ' || rec.database_name || '.INFORMATION_SCHEMA.COLUMNS';

		  counter := counter + 1;
		END FOR;
	  `

	const querySuffix = `
		res := (EXECUTE IMMEDIATE :statement);
		RETURN TABLE (res);
	END;`

	// first populate the tables and table types
	var rows *sql.Rows
	var tblConditions []string
	if len(tableType) > 0 {
		tblConditions = append(conditions, ` TABLE_TYPE IN (\'`+strings.Join(tableType, `\',\'`)+`\')`)
	} else {
		tblConditions = conditions
	}

	cond := strings.Join(tblConditions, " AND ")
	if cond != "" {
		cond = `statement := 'SELECT * FROM (' || statement || ') WHERE ` + cond + `';`
	}
	query := queryPrefix + noSchema + cond + querySuffix
	rows, err = c.sqldb.QueryContext(ctx, query)
	if err != nil {
		err = errToAdbcErr(adbc.StatusIO, err)
		return
	}
	defer rows.Close()

	var tblCat, tblSchema, tblName string
	var tblType sql.NullString
	for rows.Next() {
		if err = rows.Scan(&tblCat, &tblSchema, &tblName, &tblType); err != nil {
			err = errToAdbcErr(adbc.StatusIO, err)
			return
		}

		key := internal.CatalogAndSchema{
			Catalog: tblCat, Schema: tblSchema}

		result[key] = append(result[key], internal.TableInfo{
			Name: tblName, TableType: tblType.String})
	}

	if includeSchema {
		// if we need to include the schemas of the tables, make another fetch
		// to fetch the columns and column info
		if columnName != nil && *columnName != "" {
			conditions = append(conditions, ` column_name ILIKE \'`+*columnName+`\'`)
		}
		cond = strings.Join(conditions, " AND ")
		if cond != "" {
			cond = " WHERE " + cond
		}
		cond = `statement := 'SELECT * FROM (' || statement || ')` + cond +
			` ORDER BY table_catalog, table_schema, table_name, ordinal_position';`
		query = queryPrefix + getSchema + cond + querySuffix
		rows, err = c.sqldb.QueryContext(ctx, query)
		if err != nil {
			return
		}
		defer rows.Close()

		var (
			colName, dataType                           string
			identGen, identIncrement, comment           sql.NullString
			ordinalPos                                  int
			numericPrec, numericPrecRadix, numericScale sql.NullInt16
			isNullable, isIdent                         bool

			prevKey      internal.CatalogAndSchema
			curTableInfo *internal.TableInfo
			fieldList    = make([]arrow.Field, 0)
		)

		for rows.Next() {
			// order here matches the order of the columns requested in the query
			err = rows.Scan(&tblCat, &tblSchema, &tblName, &colName,
				&ordinalPos, &isNullable, &dataType, &numericPrec,
				&numericPrecRadix, &numericScale, &isIdent, &identGen,
				&identIncrement, &comment)
			if err != nil {
				err = errToAdbcErr(adbc.StatusIO, err)
				return
			}

			key := internal.CatalogAndSchema{Catalog: tblCat, Schema: tblSchema}
			if prevKey != key || (curTableInfo != nil && curTableInfo.Name != tblName) {
				if len(fieldList) > 0 && curTableInfo != nil {
					curTableInfo.Schema = arrow.NewSchema(fieldList, nil)
					fieldList = fieldList[:0]
				}

				info := result[key]
				for i := range info {
					if info[i].Name == tblName {
						curTableInfo = &info[i]
						break
					}
				}
			}

			prevKey = key
			fieldList = append(fieldList, toField(colName, isNullable, dataType, numericPrec, numericPrecRadix, numericScale, isIdent, identGen, identIncrement, comment, ordinalPos))
		}

		if len(fieldList) > 0 && curTableInfo != nil {
			curTableInfo.Schema = arrow.NewSchema(fieldList, nil)
		}
	}
	return
}