in csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs [359:588]
public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern)
{
if (SessionHandle == null)
{
throw new InvalidOperationException("Invalid session");
}
Dictionary<string, Dictionary<string, Dictionary<string, TableInfo>>> catalogMap = new Dictionary<string, Dictionary<string, Dictionary<string, TableInfo>>>();
CancellationToken cancellationToken = ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
try
{
if (GetObjectsPatternsRequireLowerCase)
{
catalogPattern = catalogPattern?.ToLower();
dbSchemaPattern = dbSchemaPattern?.ToLower();
tableNamePattern = tableNamePattern?.ToLower();
columnNamePattern = columnNamePattern?.ToLower();
}
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs)
{
TGetCatalogsReq getCatalogsReq = new TGetCatalogsReq(SessionHandle);
if (AreResultsAvailableDirectly)
{
SetDirectResults(getCatalogsReq);
}
TGetCatalogsResp getCatalogsResp = Client.GetCatalogs(getCatalogsReq, cancellationToken).Result;
if (getCatalogsResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(getCatalogsResp.Status.ErrorMessage);
}
var catalogsMetadata = GetResultSetMetadataAsync(getCatalogsResp, cancellationToken).Result;
IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(catalogsMetadata.Schema.Columns);
string catalogRegexp = PatternToRegEx(catalogPattern);
TRowSet rowSet = GetRowSetAsync(getCatalogsResp, cancellationToken).Result;
IReadOnlyList<string> list = rowSet.Columns[columnMap[TableCat]].StringVal.Values;
for (int i = 0; i < list.Count; i++)
{
string col = list[i];
string catalog = col;
if (Regex.IsMatch(catalog, catalogRegexp, RegexOptions.IgnoreCase))
{
catalogMap.Add(catalog, new Dictionary<string, Dictionary<string, TableInfo>>());
}
}
// Handle the case where server does not support 'catalog' in the namespace.
if (list.Count == 0 && string.IsNullOrEmpty(catalogPattern))
{
catalogMap.Add(string.Empty, []);
}
}
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.DbSchemas)
{
TGetSchemasReq getSchemasReq = new TGetSchemasReq(SessionHandle);
getSchemasReq.CatalogName = catalogPattern;
getSchemasReq.SchemaName = dbSchemaPattern;
if (AreResultsAvailableDirectly)
{
SetDirectResults(getSchemasReq);
}
TGetSchemasResp getSchemasResp = Client.GetSchemas(getSchemasReq, cancellationToken).Result;
if (getSchemasResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(getSchemasResp.Status.ErrorMessage);
}
TGetResultSetMetadataResp schemaMetadata = GetResultSetMetadataAsync(getSchemasResp, cancellationToken).Result;
IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(schemaMetadata.Schema.Columns);
TRowSet rowSet = GetRowSetAsync(getSchemasResp, cancellationToken).Result;
IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[TableCatalog]].StringVal.Values;
IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[TableSchem]].StringVal.Values;
for (int i = 0; i < catalogList.Count; i++)
{
string catalog = catalogList[i];
string schemaDb = schemaList[i];
// It seems Spark sometimes returns empty string for catalog on some schema (temporary tables).
catalogMap.GetValueOrDefault(catalog)?.Add(schemaDb, new Dictionary<string, TableInfo>());
}
}
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Tables)
{
TGetTablesReq getTablesReq = new TGetTablesReq(SessionHandle);
getTablesReq.CatalogName = catalogPattern;
getTablesReq.SchemaName = dbSchemaPattern;
getTablesReq.TableName = tableNamePattern;
if (AreResultsAvailableDirectly)
{
SetDirectResults(getTablesReq);
}
TGetTablesResp getTablesResp = Client.GetTables(getTablesReq, cancellationToken).Result;
if (getTablesResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(getTablesResp.Status.ErrorMessage);
}
TGetResultSetMetadataResp tableMetadata = GetResultSetMetadataAsync(getTablesResp, cancellationToken).Result;
IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(tableMetadata.Schema.Columns);
TRowSet rowSet = GetRowSetAsync(getTablesResp, cancellationToken).Result;
IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[TableCat]].StringVal.Values;
IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[TableSchem]].StringVal.Values;
IReadOnlyList<string> tableList = rowSet.Columns[columnMap[TableName]].StringVal.Values;
IReadOnlyList<string> tableTypeList = rowSet.Columns[columnMap[TableType]].StringVal.Values;
for (int i = 0; i < catalogList.Count; i++)
{
string catalog = catalogList[i];
string schemaDb = schemaList[i];
string tableName = tableList[i];
string tableType = tableTypeList[i];
TableInfo tableInfo = new(tableType);
catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.Add(tableName, tableInfo);
}
}
if (depth == GetObjectsDepth.All)
{
TGetColumnsReq columnsReq = new TGetColumnsReq(SessionHandle);
columnsReq.CatalogName = catalogPattern;
columnsReq.SchemaName = dbSchemaPattern;
columnsReq.TableName = tableNamePattern;
if (AreResultsAvailableDirectly)
{
SetDirectResults(columnsReq);
}
if (!string.IsNullOrEmpty(columnNamePattern))
columnsReq.ColumnName = columnNamePattern;
var columnsResponse = Client.GetColumns(columnsReq, cancellationToken).Result;
if (columnsResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(columnsResponse.Status.ErrorMessage);
}
TGetResultSetMetadataResp columnsMetadata = GetResultSetMetadataAsync(columnsResponse, cancellationToken).Result;
IReadOnlyDictionary<string, int> columnMap = GetColumnIndexMap(columnsMetadata.Schema.Columns);
TRowSet rowSet = GetRowSetAsync(columnsResponse, cancellationToken).Result;
ColumnsMetadataColumnNames columnNames = GetColumnsMetadataColumnNames();
IReadOnlyList<string> catalogList = rowSet.Columns[columnMap[columnNames.TableCatalog]].StringVal.Values;
IReadOnlyList<string> schemaList = rowSet.Columns[columnMap[columnNames.TableSchema]].StringVal.Values;
IReadOnlyList<string> tableList = rowSet.Columns[columnMap[columnNames.TableName]].StringVal.Values;
IReadOnlyList<string> columnNameList = rowSet.Columns[columnMap[columnNames.ColumnName]].StringVal.Values;
ReadOnlySpan<int> columnTypeList = rowSet.Columns[columnMap[columnNames.DataType]].I32Val.Values.Values;
IReadOnlyList<string> typeNameList = rowSet.Columns[columnMap[columnNames.TypeName]].StringVal.Values;
ReadOnlySpan<int> nullableList = rowSet.Columns[columnMap[columnNames.Nullable]].I32Val.Values.Values;
IReadOnlyList<string> columnDefaultList = rowSet.Columns[columnMap[columnNames.ColumnDef]].StringVal.Values;
ReadOnlySpan<int> ordinalPosList = rowSet.Columns[columnMap[columnNames.OrdinalPosition]].I32Val.Values.Values;
IReadOnlyList<string> isNullableList = rowSet.Columns[columnMap[columnNames.IsNullable]].StringVal.Values;
IReadOnlyList<string> isAutoIncrementList = rowSet.Columns[columnMap[columnNames.IsAutoIncrement]].StringVal.Values;
ReadOnlySpan<int> columnSizeList = rowSet.Columns[columnMap[columnNames.ColumnSize]].I32Val.Values.Values;
ReadOnlySpan<int> decimalDigitsList = rowSet.Columns[columnMap[columnNames.DecimalDigits]].I32Val.Values.Values;
for (int i = 0; i < catalogList.Count; i++)
{
// For systems that don't support 'catalog' in the namespace
string catalog = catalogList[i] ?? string.Empty;
string schemaDb = schemaList[i];
string tableName = tableList[i];
string columnName = columnNameList[i];
short colType = (short)columnTypeList[i];
string typeName = typeNameList[i];
short nullable = (short)nullableList[i];
string? isAutoIncrementString = isAutoIncrementList[i];
bool isAutoIncrement = (!string.IsNullOrEmpty(isAutoIncrementString) && (isAutoIncrementString.Equals("YES", StringComparison.InvariantCultureIgnoreCase) || isAutoIncrementString.Equals("TRUE", StringComparison.InvariantCultureIgnoreCase)));
string isNullable = isNullableList[i] ?? "YES";
string columnDefault = columnDefaultList[i] ?? "";
// Spark/Databricks reports ordinal index zero-indexed, instead of one-indexed
int ordinalPos = ordinalPosList[i] + PositionRequiredOffset;
int columnSize = columnSizeList[i];
int decimalDigits = decimalDigitsList[i];
TableInfo? tableInfo = catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.GetValueOrDefault(tableName);
tableInfo?.ColumnName.Add(columnName);
tableInfo?.ColType.Add(colType);
tableInfo?.Nullable.Add(nullable);
tableInfo?.IsAutoIncrement.Add(isAutoIncrement);
tableInfo?.IsNullable.Add(isNullable);
tableInfo?.ColumnDefault.Add(columnDefault);
tableInfo?.OrdinalPosition.Add(ordinalPos);
SetPrecisionScaleAndTypeName(colType, typeName, tableInfo, columnSize, decimalDigits);
}
}
StringArray.Builder catalogNameBuilder = new StringArray.Builder();
List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>();
foreach (KeyValuePair<string, Dictionary<string, Dictionary<string, TableInfo>>> catalogEntry in catalogMap)
{
catalogNameBuilder.Append(catalogEntry.Key);
if (depth == GetObjectsDepth.Catalogs)
{
catalogDbSchemasValues.Add(null);
}
else
{
catalogDbSchemasValues.Add(GetDbSchemas(
depth, catalogEntry.Value));
}
}
Schema schema = StandardSchemas.GetObjectsSchema;
IReadOnlyList<IArrowArray> dataArrays = schema.Validate(
new List<IArrowArray>
{
catalogNameBuilder.Build(),
catalogDbSchemasValues.BuildListArrayForType(new StructType(StandardSchemas.DbSchemaSchema)),
});
return new HiveInfoArrowStream(schema, dataArrays);
}
catch (Exception ex) when (ExceptionHelper.IsOperationCanceledOrCancellationRequested(ex, cancellationToken))
{
throw new TimeoutException("The metadata query execution timed out. Consider increasing the query timeout value.", ex);
}
catch (Exception ex) when (ex is not HiveServer2Exception)
{
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ex.Message}'", ex);
}
}