def add_to_dataset_metadata()

in petastorm/utils.py [0:0]


def add_to_dataset_metadata(dataset, key, value):
    """
    Adds a key and value to the parquet metadata file of a parquet dataset.
    :param dataset: (ParquetDataset) parquet dataset
    :param key:     (str) key of metadata entry
    :param value:   (str) value of metadata
    """
    if not isinstance(dataset.paths, str):
        raise ValueError('Expected dataset.paths to be a single path, not a list of paths')

    metadata_file_path = dataset.paths.rstrip('/') + '/_metadata'
    common_metadata_file_path = dataset.paths.rstrip('/') + '/_common_metadata'
    common_metadata_file_crc_path = dataset.paths.rstrip('/') + '/._common_metadata.crc'

    # If the metadata file already exists, add to it.
    # Otherwise fetch the schema from one of the existing parquet files in the dataset
    if dataset.fs.exists(common_metadata_file_path):
        with dataset.fs.open(common_metadata_file_path) as f:
            arrow_metadata = pyarrow.parquet.read_metadata(f)
    elif dataset.fs.exists(metadata_file_path):
        # If just the metadata file exists and not the common metadata file, copy the contents of
        # the metadata file to the common_metadata file for backwards compatibility
        with dataset.fs.open(metadata_file_path) as f:
            arrow_metadata = pyarrow.parquet.read_metadata(f)
    else:
        arrow_metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)

    base_schema = arrow_metadata.schema.to_arrow_schema()

    # base_schema.metadata may be None, e.g.
    metadata_dict = base_schema.metadata or dict()
    metadata_dict[key] = value
    schema = compat_with_metadata(base_schema, metadata_dict)

    with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file:
        pyarrow.parquet.write_metadata(schema, metadata_file)

    # We have just modified _common_metadata file, but the filesystem implementation used by pyarrow does not
    # update the .crc value. We better delete the .crc to make sure there is no mismatch between _common_metadata
    # content and the checksum.
    if isinstance(dataset.fs, LocalFileSystem) and dataset.fs.exists(common_metadata_file_crc_path):
        try:
            dataset.fs.rm(common_metadata_file_crc_path)
        except NotImplementedError:
            os.remove(common_metadata_file_crc_path)