func()

in go/adbc/driver/snowflake/connection.go [172:368]


func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog, dbSchema, tableName, columnName *string, tableType []string) (rdr array.RecordReader, err error) {
	var (
		pkQueryID, fkQueryID, uniqueQueryID, terseDbQueryID string
		showSchemaQueryID, tableQueryID                     string
	)

	conn := c.cn
	var hasViews, hasTables bool
	for _, t := range tableType {
		if strings.EqualFold("VIEW", t) {
			hasViews = true
		} else if strings.EqualFold("TABLE", t) {
			hasTables = true
		}
	}

	// force empty result from SHOW TABLES if tableType list is not empty
	// and does not contain TABLE or VIEW in the list.
	// we need this because we should have non-null db_schema_tables when
	// depth is Tables, Columns or All.
	var badTableType = "tabletypedoesnotexist"
	if len(tableType) > 0 && depth >= adbc.ObjectDepthTables && !hasViews && !hasTables {
		tableName = &badTableType
		tableType = []string{"TABLE"}
	}

	gQueryIDs, gQueryIDsCtx := errgroup.WithContext(ctx)
	queryFile := queryTemplateGetObjectsAll
	switch depth {
	case adbc.ObjectDepthCatalogs:
		queryFile = queryTemplateGetObjectsTerseCatalogs
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
			catalog, dbSchema, tableName, &terseDbQueryID)
	case adbc.ObjectDepthDBSchemas:
		queryFile = queryTemplateGetObjectsDbSchemas
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
			catalog, dbSchema, tableName, &showSchemaQueryID)
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
			catalog, dbSchema, tableName, &terseDbQueryID)
	case adbc.ObjectDepthTables:
		queryFile = queryTemplateGetObjectsTables
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
			catalog, dbSchema, tableName, &showSchemaQueryID)
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
			catalog, dbSchema, tableName, &terseDbQueryID)

		objType := objObjects
		if len(tableType) == 1 {
			if strings.EqualFold("VIEW", tableType[0]) {
				objType = objViews
			} else if strings.EqualFold("TABLE", tableType[0]) {
				objType = objTables
			}
		}

		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objType,
			catalog, dbSchema, tableName, &tableQueryID)
	default:
		var suffix string
		if catalog == nil || isWildcardStr(*catalog) {
			suffix = " IN ACCOUNT"
		} else {
			escapedCatalog := quoteTblName(*catalog)
			if dbSchema == nil || isWildcardStr(*dbSchema) {
				suffix = " IN DATABASE " + escapedCatalog
			} else {
				escapedSchema := quoteTblName(*dbSchema)
				if tableName == nil || isWildcardStr(*tableName) {
					suffix = " IN SCHEMA " + escapedCatalog + "." + escapedSchema
				} else {
					escapedTable := quoteTblName(*tableName)
					suffix = " IN TABLE " + escapedCatalog + "." + escapedSchema + "." + escapedTable
				}
			}
		}

		// Detailed constraint info not available in information_schema
		// Need to dispatch SHOW queries and use conn.Raw to extract the queryID for reuse in GetObjects query
		gQueryIDs.Go(func() (err error) {
			pkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW PRIMARY KEYS /* ADBC:getObjectsTables */"+suffix, conn)
			return err
		})

		gQueryIDs.Go(func() (err error) {
			fkQueryID, err = getQueryID(gQueryIDsCtx, "SHOW IMPORTED KEYS /* ADBC:getObjectsTables */"+suffix, conn)
			return err
		})

		gQueryIDs.Go(func() (err error) {
			uniqueQueryID, err = getQueryID(gQueryIDsCtx, "SHOW UNIQUE KEYS /* ADBC:getObjectsTables */"+suffix, conn)
			return err
		})

		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objDatabases,
			catalog, dbSchema, tableName, &terseDbQueryID)
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objSchemas,
			catalog, dbSchema, tableName, &showSchemaQueryID)

		objType := objObjects
		if len(tableType) == 1 {
			if strings.EqualFold("VIEW", tableType[0]) {
				objType = objViews
			} else if strings.EqualFold("TABLE", tableType[0]) {
				objType = objTables
			}
		}
		goGetQueryID(gQueryIDsCtx, conn, gQueryIDs, objType,
			catalog, dbSchema, tableName, &tableQueryID)
	}

	queryBytes, err := fs.ReadFile(queryTemplates, path.Join("queries", queryFile))
	if err != nil {
		return nil, err
	}

	// Need constraint subqueries to complete before we can query GetObjects
	if err := gQueryIDs.Wait(); err != nil {
		return nil, err
	}

	args := []sql.NamedArg{
		// Optional filter patterns
		driverbase.PatternToNamedArg("CATALOG", catalog),
		driverbase.PatternToNamedArg("DB_SCHEMA", dbSchema),
		driverbase.PatternToNamedArg("TABLE", tableName),
		driverbase.PatternToNamedArg("COLUMN", columnName),

		// QueryIDs for constraint data if depth is tables or deeper
		// or if the depth is catalog and catalog is null
		sql.Named("PK_QUERY_ID", pkQueryID),
		sql.Named("FK_QUERY_ID", fkQueryID),
		sql.Named("UNIQUE_QUERY_ID", uniqueQueryID),
		sql.Named("SHOW_DB_QUERY_ID", terseDbQueryID),
		sql.Named("SHOW_SCHEMA_QUERY_ID", showSchemaQueryID),
		sql.Named("SHOW_TABLE_QUERY_ID", tableQueryID),
	}

	nvargs := make([]driver.NamedValue, len(args))
	for i, arg := range args {
		nvargs[i] = driver.NamedValue{
			Name:    arg.Name,
			Ordinal: i + 1,
			Value:   arg.Value,
		}
	}

	query := string(queryBytes)
	rows, err := conn.QueryContext(ctx, query, nvargs)
	if err != nil {
		return nil, errToAdbcErr(adbc.StatusIO, err)
	}
	defer func() {
		err = errors.Join(err, rows.Close())
	}()

	catalogCh := make(chan driverbase.GetObjectsInfo, runtime.NumCPU())
	errCh := make(chan error)

	go func() {
		defer close(catalogCh)
		dest := make([]driver.Value, len(rows.Columns()))
		for {
			if err := rows.Next(dest); err != nil {
				if errors.Is(err, io.EOF) {
					return
				}
				errCh <- errToAdbcErr(adbc.StatusInvalidData, err)
				return
			}

			var getObjectsCatalog driverbase.GetObjectsInfo
			if err := getObjectsCatalog.Scan(dest[0]); err != nil {
				errCh <- errToAdbcErr(adbc.StatusInvalidData, err)
				return
			}

			// A few columns need additional processing outside of Snowflake
			for i, sch := range getObjectsCatalog.CatalogDbSchemas {
				for j, tab := range sch.DbSchemaTables {
					for k, col := range tab.TableColumns {
						field := c.toArrowField(col)
						xdbcDataType := internal.ToXdbcDataType(field.Type)

						if field.Type != nil {
							getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = driverbase.Nullable(int16(field.Type.ID()))
						}
						getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = driverbase.Nullable(int16(xdbcDataType))
					}
				}
			}

			catalogCh <- getObjectsCatalog
		}
	}()

	return driverbase.BuildGetObjectsRecordReader(c.Alloc, catalogCh, errCh)
}