in petastorm/utils.py [0:0]
def add_to_dataset_metadata(dataset, key, value):
"""
Adds a key and value to the parquet metadata file of a parquet dataset.
:param dataset: (ParquetDataset) parquet dataset
:param key: (str) key of metadata entry
:param value: (str) value of metadata
"""
if not isinstance(dataset.paths, str):
raise ValueError('Expected dataset.paths to be a single path, not a list of paths')
metadata_file_path = dataset.paths.rstrip('/') + '/_metadata'
common_metadata_file_path = dataset.paths.rstrip('/') + '/_common_metadata'
common_metadata_file_crc_path = dataset.paths.rstrip('/') + '/._common_metadata.crc'
# If the metadata file already exists, add to it.
# Otherwise fetch the schema from one of the existing parquet files in the dataset
if dataset.fs.exists(common_metadata_file_path):
with dataset.fs.open(common_metadata_file_path) as f:
arrow_metadata = pyarrow.parquet.read_metadata(f)
elif dataset.fs.exists(metadata_file_path):
# If just the metadata file exists and not the common metadata file, copy the contents of
# the metadata file to the common_metadata file for backwards compatibility
with dataset.fs.open(metadata_file_path) as f:
arrow_metadata = pyarrow.parquet.read_metadata(f)
else:
arrow_metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)
base_schema = arrow_metadata.schema.to_arrow_schema()
# base_schema.metadata may be None, e.g.
metadata_dict = base_schema.metadata or dict()
metadata_dict[key] = value
schema = compat_with_metadata(base_schema, metadata_dict)
with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file:
pyarrow.parquet.write_metadata(schema, metadata_file)
# We have just modified _common_metadata file, but the filesystem implementation used by pyarrow does not
# update the .crc value. We better delete the .crc to make sure there is no mismatch between _common_metadata
# content and the checksum.
if isinstance(dataset.fs, LocalFileSystem) and dataset.fs.exists(common_metadata_file_crc_path):
try:
dataset.fs.rm(common_metadata_file_crc_path)
except NotImplementedError:
os.remove(common_metadata_file_crc_path)