in horovod/spark/common/util.py [0:0]
def get_simple_meta_from_parquet(store, label_columns, feature_columns, sample_weight_col,
dataset_idx=None):
train_data_path = store.get_train_data_path(dataset_idx)
validation_data_path = store.get_val_data_path(dataset_idx)
if not store.exists(train_data_path):
raise ValueError("{} path does not exist in the store".format(train_data_path))
train_data_meta_path = store.get_data_metadata_path(train_data_path)
val_data_meta_path = store.get_data_metadata_path(validation_data_path)
fs = store.get_filesystem()
schema_cols = feature_columns + label_columns
if sample_weight_col:
schema_cols.append(sample_weight_col)
def make_metadata_dictionary(_train_data_schema):
_metadata = {}
for col in schema_cols:
col_schema = _train_data_schema.field_by_name(col)
col_info = {
'spark_data_type': pyarrow_to_spark_data_type(col_schema.type),
'is_sparse_vector_only': False,
'shape': None, # Only used by SparseVector columns
'intermediate_format': constants.NOCHANGE,
'max_size': None # Only used by SparseVector columns
}
_metadata[col] = col_info
_avg_row_size = train_data_total_byte_size / train_rows
return _metadata, _avg_row_size
# In the try block we try to read the data metadata from the cached metadata in the store. If
# anything goes wrong, we will ignore the cache and create the metadata from data.
try:
if store.exists(train_data_meta_path):
train_data_schema, train_rows, train_data_total_byte_size = \
_load_metadata_from_fs(fs, train_data_meta_path)
metadata, avg_row_size = make_metadata_dictionary(train_data_schema)
val_rows = 0
if store.exists(validation_data_path) and store.exists(val_data_meta_path):
val_data_schema, val_rows, val_data_total_byte_size = _load_metadata_from_fs(fs,
val_data_meta_path)
return train_rows, val_rows, metadata, avg_row_size
except Exception as ex:
print(ex)
train_data = store.get_parquet_dataset(train_data_path)
train_data_schema = train_data.schema.to_arrow_schema()
train_rows, train_data_total_byte_size = _get_dataset_info(train_data, 'training',
train_data_path)
# Write train metadata to filesystem
_save_meta_to_fs(fs, train_data_meta_path, train_data_schema, train_rows,
train_data_total_byte_size)
val_rows = 0
if store.exists(validation_data_path):
val_data = store.get_parquet_dataset(validation_data_path)
val_data_schema = val_data.schema.to_arrow_schema()
val_rows, val_data_total_byte_size = _get_dataset_info(val_data, 'validation',
validation_data_path)
# Write validation metadata to filesystem
_save_meta_to_fs(fs, val_data_meta_path, val_data_schema, val_rows,
val_data_total_byte_size)
metadata, avg_row_size = make_metadata_dictionary(train_data_schema)
return train_rows, val_rows, metadata, avg_row_size