in pyiceberg/catalog/glue.py [0:0]
def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
results: Dict[str, ColumnTypeDef] = {}
def _append_to_results(field: NestedField, is_current: bool) -> None:
if field.name in results:
return
results[field.name] = cast(
ColumnTypeDef,
{
"Name": field.name,
"Type": visit(field.field_type, _IcebergSchemaToGlueType()),
"Parameters": {
ICEBERG_FIELD_ID: str(field.field_id),
ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(),
ICEBERG_FIELD_CURRENT: str(is_current).lower(),
},
},
)
if field.doc:
results[field.name]["Comment"] = field.doc
if current_schema := metadata.schema_by_id(metadata.current_schema_id):
for field in current_schema.columns:
_append_to_results(field, True)
for schema in metadata.schemas:
if schema.schema_id == metadata.current_schema_id:
continue
for field in schema.columns:
_append_to_results(field, False)
return list(results.values())