in petastorm/py_dict_reader_worker.py [0:0]
def read_next(self, workers_pool, schema, ngram):
try:
# We are receiving decoded rows from the worker in chunks. We store the list internally
# and return a single item upon each consequent call to __next__
with self._result_buffer_lock:
if not self._result_buffer:
# Reverse order, so we can pop from the end of the list in O(1) while maintaining
# order the items are returned from the worker
list_of_rows = list(reversed(workers_pool.get_results()))
if ngram:
for ngram_row in list_of_rows:
for timestamp in ngram_row.keys():
row = ngram_row[timestamp]
schema_at_timestamp = ngram.get_schema_at_timestep(schema, timestamp)
ngram_row[timestamp] = schema_at_timestamp.make_namedtuple(**row)
self._result_buffer = list_of_rows
else:
self._result_buffer = [schema.make_namedtuple(**row) for row in list_of_rows]
return self._result_buffer.pop()
except EmptyResultError:
raise StopIteration