def metadata_log_entries()

in pyiceberg/table/inspect.py [0:0]


    def metadata_log_entries(self) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.table.snapshots import MetadataLogEntry

        table_schema = pa.schema(
            [
                pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
                pa.field("file", pa.string(), nullable=False),
                pa.field("latest_snapshot_id", pa.int64(), nullable=True),
                pa.field("latest_schema_id", pa.int32(), nullable=True),
                pa.field("latest_sequence_number", pa.int64(), nullable=True),
            ]
        )

        def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]:
            latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
            return {
                "timestamp": metadata_entry.timestamp_ms,
                "file": metadata_entry.metadata_file,
                "latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None,
                "latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None,
                "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None,
            }

        # similar to MetadataLogEntriesTable in Java
        # https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
        metadata_log_entries = self.tbl.metadata.metadata_log + [
            MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)
        ]

        return pa.Table.from_pylist(
            [metadata_log_entry_to_row(entry) for entry in metadata_log_entries],
            schema=table_schema,
        )