public override IArrowArrayStream GetObjects()

in csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs [359:588]


        public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern)
        {
            if (SessionHandle == null)
            {
                throw new InvalidOperationException("Invalid session");
            }

            Dictionary<string, Dictionary<string, Dictionary<string, TableInfo>>> catalogMap = new Dictionary<string, Dictionary<string, Dictionary<string, TableInfo>>>();
            CancellationToken cancellationToken = ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
            try
            {
                if (GetObjectsPatternsRequireLowerCase)
                {
                    catalogPattern = catalogPattern?.ToLower();
                    dbSchemaPattern = dbSchemaPattern?.ToLower();
                    tableNamePattern = tableNamePattern?.ToLower();
                    columnNamePattern = columnNamePattern?.ToLower();
                }
                if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs)
                {
                    TGetCatalogsReq getCatalogsReq = new TGetCatalogsReq(SessionHandle);
                    if (AreResultsAvailableDirectly)
                    {
                        SetDirectResults(getCatalogsReq);
                    }

                    TGetCatalogsResp getCatalogsResp = Client.GetCatalogs(getCatalogsReq, cancellationToken).Result;

                    if (getCatalogsResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
                    {
                        throw new Exception(getCatalogsResp.Status.ErrorMessage);
                    }
                    var catalogsMetadata = GetResultSetMetadataAsync(getCatalogsResp, cancellationToken).Result;
                    IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(catalogsMetadata.Schema.Columns);

                    string catalogRegexp = PatternToRegEx(catalogPattern);
                    TRowSet rowSet = GetRowSetAsync(getCatalogsResp, cancellationToken).Result;
                    IReadOnlyList<string> list = rowSet.Columns[columnMap[TableCat]].StringVal.Values;
                    for (int i = 0; i < list.Count; i++)
                    {
                        string col = list[i];
                        string catalog = col;

                        if (Regex.IsMatch(catalog, catalogRegexp, RegexOptions.IgnoreCase))
                        {
                            catalogMap.Add(catalog, new Dictionary<string, Dictionary<string, TableInfo>>());
                        }
                    }
                    // Handle the case where server does not support 'catalog' in the namespace.
                    if (list.Count == 0 && string.IsNullOrEmpty(catalogPattern))
                    {
                        catalogMap.Add(string.Empty, []);
                    }
                }

                if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.DbSchemas)
                {
                    TGetSchemasReq getSchemasReq = new TGetSchemasReq(SessionHandle);
                    getSchemasReq.CatalogName = catalogPattern;
                    getSchemasReq.SchemaName = dbSchemaPattern;
                    if (AreResultsAvailableDirectly)
                    {
                        SetDirectResults(getSchemasReq);
                    }

                    TGetSchemasResp getSchemasResp = Client.GetSchemas(getSchemasReq, cancellationToken).Result;
                    if (getSchemasResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
                    {
                        throw new Exception(getSchemasResp.Status.ErrorMessage);
                    }

                    TGetResultSetMetadataResp schemaMetadata = GetResultSetMetadataAsync(getSchemasResp, cancellationToken).Result;
                    IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(schemaMetadata.Schema.Columns);
                    TRowSet rowSet = GetRowSetAsync(getSchemasResp, cancellationToken).Result;

                    IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[TableCatalog]].StringVal.Values;
                    IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[TableSchem]].StringVal.Values;

                    for (int i = 0; i < catalogList.Count; i++)
                    {
                        string catalog = catalogList[i];
                        string schemaDb = schemaList[i];
                        // It seems Spark sometimes returns empty string for catalog on some schema (temporary tables).
                        catalogMap.GetValueOrDefault(catalog)?.Add(schemaDb, new Dictionary<string, TableInfo>());
                    }
                }

                if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Tables)
                {
                    TGetTablesReq getTablesReq = new TGetTablesReq(SessionHandle);
                    getTablesReq.CatalogName = catalogPattern;
                    getTablesReq.SchemaName = dbSchemaPattern;
                    getTablesReq.TableName = tableNamePattern;
                    if (AreResultsAvailableDirectly)
                    {
                        SetDirectResults(getTablesReq);
                    }

                    TGetTablesResp getTablesResp = Client.GetTables(getTablesReq, cancellationToken).Result;
                    if (getTablesResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
                    {
                        throw new Exception(getTablesResp.Status.ErrorMessage);
                    }

                    TGetResultSetMetadataResp tableMetadata = GetResultSetMetadataAsync(getTablesResp, cancellationToken).Result;
                    IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(tableMetadata.Schema.Columns);
                    TRowSet rowSet = GetRowSetAsync(getTablesResp, cancellationToken).Result;

                    IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[TableCat]].StringVal.Values;
                    IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[TableSchem]].StringVal.Values;
                    IReadOnlyList<string> tableList = rowSet.Columns[columnMap[TableName]].StringVal.Values;
                    IReadOnlyList<string> tableTypeList = rowSet.Columns[columnMap[TableType]].StringVal.Values;

                    for (int i = 0; i < catalogList.Count; i++)
                    {
                        string catalog = catalogList[i];
                        string schemaDb = schemaList[i];
                        string tableName = tableList[i];
                        string tableType = tableTypeList[i];
                        TableInfo tableInfo = new(tableType);
                        catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.Add(tableName, tableInfo);
                    }
                }

                if (depth == GetObjectsDepth.All)
                {
                    TGetColumnsReq columnsReq = new TGetColumnsReq(SessionHandle);
                    columnsReq.CatalogName = catalogPattern;
                    columnsReq.SchemaName = dbSchemaPattern;
                    columnsReq.TableName = tableNamePattern;
                    if (AreResultsAvailableDirectly)
                    {
                        SetDirectResults(columnsReq);
                    }

                    if (!string.IsNullOrEmpty(columnNamePattern))
                        columnsReq.ColumnName = columnNamePattern;

                    var columnsResponse = Client.GetColumns(columnsReq, cancellationToken).Result;
                    if (columnsResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
                    {
                        throw new Exception(columnsResponse.Status.ErrorMessage);
                    }

                    TGetResultSetMetadataResp columnsMetadata = GetResultSetMetadataAsync(columnsResponse, cancellationToken).Result;
                    IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(columnsMetadata.Schema.Columns);
                    TRowSet rowSet = GetRowSetAsync(columnsResponse, cancellationToken).Result;

                    ColumnsMetadataColumnNames columnNames = GetColumnsMetadataColumnNames();
                    IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[columnNames.TableCatalog]].StringVal.Values;
                    IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[columnNames.TableSchema]].StringVal.Values;
                    IReadOnlyList<string> tableList = rowSet.Columns[columnMap[columnNames.TableName]].StringVal.Values;
                    IReadOnlyList<string> columnNameList = rowSet.Columns[columnMap[columnNames.ColumnName]].StringVal.Values;
                    ReadOnlySpan<int> columnTypeList = rowSet.Columns[columnMap[columnNames.DataType]].I32Val.Values.Values;
                    IReadOnlyList<string> typeNameList = rowSet.Columns[columnMap[columnNames.TypeName]].StringVal.Values;
                    ReadOnlySpan<int> nullableList = rowSet.Columns[columnMap[columnNames.Nullable]].I32Val.Values.Values;
                    IReadOnlyList<string> columnDefaultList = rowSet.Columns[columnMap[columnNames.ColumnDef]].StringVal.Values;
                    ReadOnlySpan<int> ordinalPosList = rowSet.Columns[columnMap[columnNames.OrdinalPosition]].I32Val.Values.Values;
                    IReadOnlyList<string> isNullableList = rowSet.Columns[columnMap[columnNames.IsNullable]].StringVal.Values;
                    IReadOnlyList<string> isAutoIncrementList = rowSet.Columns[columnMap[columnNames.IsAutoIncrement]].StringVal.Values;
                    ReadOnlySpan<int> columnSizeList = rowSet.Columns[columnMap[columnNames.ColumnSize]].I32Val.Values.Values;
                    ReadOnlySpan<int> decimalDigitsList = rowSet.Columns[columnMap[columnNames.DecimalDigits]].I32Val.Values.Values;

                    for (int i = 0; i < catalogList.Count; i++)
                    {
                        // For systems that don't support 'catalog' in the namespace
                        string catalog = catalogList[i] ?? string.Empty;
                        string schemaDb = schemaList[i];
                        string tableName = tableList[i];
                        string columnName = columnNameList[i];
                        short colType = (short)columnTypeList[i];
                        string typeName = typeNameList[i];
                        short nullable = (short)nullableList[i];
                        string? isAutoIncrementString = isAutoIncrementList[i];
                        bool isAutoIncrement = (!string.IsNullOrEmpty(isAutoIncrementString) && (isAutoIncrementString.Equals("YES", StringComparison.InvariantCultureIgnoreCase) || isAutoIncrementString.Equals("TRUE", StringComparison.InvariantCultureIgnoreCase)));
                        string isNullable = isNullableList[i] ?? "YES";
                        string columnDefault = columnDefaultList[i] ?? "";
                        // Spark/Databricks reports ordinal index zero-indexed, instead of one-indexed
                        int ordinalPos = ordinalPosList[i] + PositionRequiredOffset;
                        int columnSize = columnSizeList[i];
                        int decimalDigits = decimalDigitsList[i];
                        TableInfo? tableInfo = catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.GetValueOrDefault(tableName);
                        tableInfo?.ColumnName.Add(columnName);
                        tableInfo?.ColType.Add(colType);
                        tableInfo?.Nullable.Add(nullable);
                        tableInfo?.IsAutoIncrement.Add(isAutoIncrement);
                        tableInfo?.IsNullable.Add(isNullable);
                        tableInfo?.ColumnDefault.Add(columnDefault);
                        tableInfo?.OrdinalPosition.Add(ordinalPos);
                        SetPrecisionScaleAndTypeName(colType, typeName, tableInfo, columnSize, decimalDigits);
                    }
                }

                StringArray.Builder catalogNameBuilder = new StringArray.Builder();
                List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>();

                foreach (KeyValuePair<string, Dictionary<string, Dictionary<string, TableInfo>>> catalogEntry in catalogMap)
                {
                    catalogNameBuilder.Append(catalogEntry.Key);

                    if (depth == GetObjectsDepth.Catalogs)
                    {
                        catalogDbSchemasValues.Add(null);
                    }
                    else
                    {
                        catalogDbSchemasValues.Add(GetDbSchemas(
                                    depth, catalogEntry.Value));
                    }
                }

                Schema schema = StandardSchemas.GetObjectsSchema;
                IReadOnlyList<IArrowArray> dataArrays = schema.Validate(
                    new List<IArrowArray>
                    {
                    catalogNameBuilder.Build(),
                    catalogDbSchemasValues.BuildListArrayForType(new StructType(StandardSchemas.DbSchemaSchema)),
                    });

                return new HiveInfoArrowStream(schema, dataArrays);
            }
            catch (Exception ex) when (ExceptionHelper.IsOperationCanceledOrCancellationRequested(ex, cancellationToken))
            {
                throw new TimeoutException("The metadata query execution timed out. Consider increasing the query timeout value.", ex);
            }
            catch (Exception ex) when (ex is not HiveServer2Exception)
            {
                throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ex.Message}'", ex);
            }
        }