def get_simple_meta_from_parquet()

in horovod/spark/common/util.py [0:0]


def get_simple_meta_from_parquet(store, label_columns, feature_columns, sample_weight_col,
                                 dataset_idx=None):
    train_data_path = store.get_train_data_path(dataset_idx)
    validation_data_path = store.get_val_data_path(dataset_idx)

    if not store.exists(train_data_path):
        raise ValueError("{} path does not exist in the store".format(train_data_path))

    train_data_meta_path = store.get_data_metadata_path(train_data_path)
    val_data_meta_path = store.get_data_metadata_path(validation_data_path)
    fs = store.get_filesystem()

    schema_cols = feature_columns + label_columns
    if sample_weight_col:
        schema_cols.append(sample_weight_col)

    def make_metadata_dictionary(_train_data_schema):
        _metadata = {}
        for col in schema_cols:
            col_schema = _train_data_schema.field_by_name(col)
            col_info = {
                'spark_data_type': pyarrow_to_spark_data_type(col_schema.type),
                'is_sparse_vector_only': False,
                'shape': None,  # Only used by SparseVector columns
                'intermediate_format': constants.NOCHANGE,
                'max_size': None  # Only used by SparseVector columns
            }
            _metadata[col] = col_info

        _avg_row_size = train_data_total_byte_size / train_rows
        return _metadata, _avg_row_size

    # In the try block we try to read the data metadata from the cached metadata in the store. If
    # anything goes wrong, we will ignore the cache and create the metadata from data.
    try:
        if store.exists(train_data_meta_path):
            train_data_schema, train_rows, train_data_total_byte_size = \
                _load_metadata_from_fs(fs, train_data_meta_path)
            metadata, avg_row_size = make_metadata_dictionary(train_data_schema)

            val_rows = 0
            if store.exists(validation_data_path) and store.exists(val_data_meta_path):
                val_data_schema, val_rows, val_data_total_byte_size = _load_metadata_from_fs(fs,
                                                                                             val_data_meta_path)

            return train_rows, val_rows, metadata, avg_row_size
    except Exception as ex:
        print(ex)

    train_data = store.get_parquet_dataset(train_data_path)
    train_data_schema = train_data.schema.to_arrow_schema()
    train_rows, train_data_total_byte_size = _get_dataset_info(train_data, 'training',
                                                               train_data_path)

    # Write train metadata to filesystem
    _save_meta_to_fs(fs, train_data_meta_path, train_data_schema, train_rows,
                     train_data_total_byte_size)

    val_rows = 0
    if store.exists(validation_data_path):
        val_data = store.get_parquet_dataset(validation_data_path)
        val_data_schema = val_data.schema.to_arrow_schema()
        val_rows, val_data_total_byte_size = _get_dataset_info(val_data, 'validation',
                                                               validation_data_path)

        # Write validation metadata to filesystem
        _save_meta_to_fs(fs, val_data_meta_path, val_data_schema, val_rows,
                         val_data_total_byte_size)
    metadata, avg_row_size = make_metadata_dictionary(train_data_schema)
    return train_rows, val_rows, metadata, avg_row_size