def _to_columns()

in pyiceberg/catalog/glue.py [0:0]


def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
    results: Dict[str, ColumnTypeDef] = {}

    def _append_to_results(field: NestedField, is_current: bool) -> None:
        if field.name in results:
            return

        results[field.name] = cast(
            ColumnTypeDef,
            {
                "Name": field.name,
                "Type": visit(field.field_type, _IcebergSchemaToGlueType()),
                "Parameters": {
                    ICEBERG_FIELD_ID: str(field.field_id),
                    ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(),
                    ICEBERG_FIELD_CURRENT: str(is_current).lower(),
                },
            },
        )
        if field.doc:
            results[field.name]["Comment"] = field.doc

    if current_schema := metadata.schema_by_id(metadata.current_schema_id):
        for field in current_schema.columns:
            _append_to_results(field, True)

    for schema in metadata.schemas:
        if schema.schema_id == metadata.current_schema_id:
            continue
        for field in schema.columns:
            _append_to_results(field, False)

    return list(results.values())