in pyiceberg/table/inspect.py [0:0]
def metadata_log_entries(self) -> "pa.Table":
import pyarrow as pa
from pyiceberg.table.snapshots import MetadataLogEntry
table_schema = pa.schema(
[
pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
pa.field("file", pa.string(), nullable=False),
pa.field("latest_snapshot_id", pa.int64(), nullable=True),
pa.field("latest_schema_id", pa.int32(), nullable=True),
pa.field("latest_sequence_number", pa.int64(), nullable=True),
]
)
def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]:
latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
return {
"timestamp": metadata_entry.timestamp_ms,
"file": metadata_entry.metadata_file,
"latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None,
"latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None,
"latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None,
}
# similar to MetadataLogEntriesTable in Java
# https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
metadata_log_entries = self.tbl.metadata.metadata_log + [
MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)
]
return pa.Table.from_pylist(
[metadata_log_entry_to_row(entry) for entry in metadata_log_entries],
schema=table_schema,
)