in petastorm/ngram.py [0:0]
def form_ngram(self, data, schema):
"""
Return all the ngrams as dictated by fields, delta_threshold and timestamp_field.
:param data: The data items, which is a list of Unischema items.
:return: A dictionary, with keys ``[0, length - 1]``. The value of each key is the corresponding item in the
ngram at that position.
"""
base_key = min(self._fields.keys())
result = []
prev_ngram_end_timestamp = None
for index in range(len(data) - self.length + 1):
# Potential ngram: [index, index + self.length[
potential_ngram = data[index:index + self.length]
is_sorted = all(potential_ngram[i][self._timestamp_field.name] <=
potential_ngram[i + 1][self._timestamp_field.name]
for i in range(len(potential_ngram) - 1))
if not is_sorted:
raise NotImplementedError('NGram assumes that the data is sorted by {0} field which is not the case'
.format(self._timestamp_field.name))
if not self.timestamp_overlap and prev_ngram_end_timestamp is not None:
# If we dont want timestamps of NGrams to overlap, check that the start timestamp of the next NGram
# is not less than the end timestamp of the previous NGram
next_ngram_start_timestamp = potential_ngram[0][self._timestamp_field.name]
if next_ngram_start_timestamp <= prev_ngram_end_timestamp:
continue
# If all elements in potential_ngram passes the ngram threshold
# (i.e. current element timestamp - previous element timestamp <= delta_threshold)
# then add the potential ngram in the results, otherwise skip it
if len(potential_ngram) == self.length and self._ngram_pass_threshold(potential_ngram):
new_item = {(base_key + key): value for (key, value) in enumerate(potential_ngram)}
for key in new_item:
# Get the data for that current timestep and create a namedtuple
current_item = new_item[key]
new_item[key] = {k: current_item[k]
for k in current_item if k in self.get_field_names_at_timestep(key)}
result.append(new_item)
if not self.timestamp_overlap:
prev_ngram_end_timestamp = potential_ngram[-1][self._timestamp_field.name]
return result