in petastorm/unischema.py [0:0]
def dict_to_spark_row(unischema, row_dict):
"""Converts a single row into a spark Row object.
Verifies that the data confirms with unischema definition types and encodes the data using the codec specified
by the unischema.
The parameters are keywords to allow use of functools.partial.
:param unischema: an instance of Unischema object
:param row_dict: a dictionary where the keys match name of fields in the unischema.
:return: a single pyspark.Row object
"""
# Lazy loading pyspark to avoid creating pyspark dependency on data reading code path
# (currently works only with make_batch_reader)
import pyspark
assert isinstance(unischema, Unischema)
# Add null fields. Be careful not to mutate the input dictionary - that would be an unexpected side effect
copy_row_dict = copy.copy(row_dict)
insert_explicit_nulls(unischema, copy_row_dict)
if set(copy_row_dict.keys()) != set(unischema.fields.keys()):
raise ValueError('Dictionary fields \n{}\n do not match schema fields \n{}'.format(
'\n'.join(sorted(copy_row_dict.keys())), '\n'.join(unischema.fields.keys())))
encoded_dict = {}
for field_name, value in copy_row_dict.items():
schema_field = unischema.fields[field_name]
if value is None:
if not schema_field.nullable:
raise ValueError('Field {} is not "nullable", but got passes a None value')
if schema_field.codec:
encoded_dict[field_name] = schema_field.codec.encode(schema_field, value) if value is not None else None
else:
if isinstance(value, (np.generic,)):
encoded_dict[field_name] = value.tolist()
else:
encoded_dict[field_name] = value
field_list = list(unischema.fields.keys())
# generate a value list which match the schema column order.
value_list = [encoded_dict[name] for name in field_list]
# create a row by value list
row = pyspark.Row(*value_list)
# set row fields
row.__fields__ = field_list
return row