in tfx_bsl/coders/csv_decoder.py [0:0]
def add_input(self, accumulator: Dict[ColumnName, ColumnType],
cells: List[CSVCell]) -> Dict[ColumnName, ColumnType]:
"""Updates the feature types in the accumulator using the input row.
Args:
accumulator: A dict containing the already inferred feature types.
cells: A list containing feature values of a CSV record.
Returns:
A dict containing the updated feature types based on input row.
Raises:
ValueError: If the columns do not match the specified csv headers.
"""
# If the row is empty and we don't want to skip blank lines,
# add an empty string to each column.
if not cells and not self._skip_blank_lines:
cells = ["" for _ in range(len(self._column_names))]
elif cells and len(cells) != len(self._column_names):
raise ValueError("Columns do not match specified csv headers: %s -> %s" %
(self._column_names, cells))
# Iterate over each feature value and update the type.
for column_name, cell in zip(self._column_names, cells):
# Get the already inferred type of the feature.
previous_type = accumulator.get(column_name, None)
if column_name in self._multivalent_columns:
# the reader only accepts str but v is bytes.
values = self._multivalent_reader.ReadLine(cell.decode())
current_type = max([_InferValueType(value) for value in values
]) if values else ColumnType.UNKNOWN
else:
current_type = _InferValueType(cell)
# If the type inferred from the current value is higher in the type
# hierarchy compared to the already inferred type, we update the type.
# The type hierarchy is,
# INT (level 0) --> FLOAT (level 1) --> STRING (level 2)
if previous_type is None or current_type > previous_type:
accumulator[column_name] = current_type
return accumulator