in tfx_bsl/coders/csv_decoder.py [0:0]
def process(self, batch_of_tuple: List[Tuple[List[CSVCell], CSVLine]],
column_infos: List[ColumnInfo]) -> Iterable[pa.RecordBatch]:
if self._column_handlers is None:
self._process_column_infos(column_infos)
raw_records = []
values_list_by_column = [[] for _ in self._column_names]
for (csv_row, raw_record) in batch_of_tuple:
if not csv_row:
if not self._skip_blank_lines:
for l in values_list_by_column:
l.append(None)
continue
if len(csv_row) != len(self._column_handlers):
raise ValueError(
"Encountered a row of unexpected number of columns: {} vs. {}"
.format(len(csv_row), len(self._column_handlers)))
column_idx = 0
for csv_cell, handler in zip(csv_row, self._column_handlers):
if handler is None:
continue
values_list_by_column[column_idx].append(
handler(csv_cell) if csv_cell else None)
column_idx += 1
if self._raw_record_column_name is not None:
raw_records.append([raw_record])
arrow_arrays = [
pa.array(l, type=t)
for l, t in zip(values_list_by_column, self._column_arrow_types)
]
if self._raw_record_column_name is not None:
arrow_arrays.append(
pa.array(raw_records, type=self._raw_record_column_type))
self._column_names.append(self._raw_record_column_name)
yield pa.RecordBatch.from_arrays(arrow_arrays, self._column_names)