def process()

in tfx_bsl/coders/csv_decoder.py [0:0]


  def process(self, batch_of_tuple: List[Tuple[List[CSVCell], CSVLine]],
              column_infos: List[ColumnInfo]) -> Iterable[pa.RecordBatch]:
    if self._column_handlers is None:
      self._process_column_infos(column_infos)

    raw_records = []
    values_list_by_column = [[] for _ in self._column_names]
    for (csv_row, raw_record) in batch_of_tuple:
      if not csv_row:
        if not self._skip_blank_lines:
          for l in values_list_by_column:
            l.append(None)
        continue
      if len(csv_row) != len(self._column_handlers):
        raise ValueError(
            "Encountered a row of unexpected number of columns: {} vs. {}"
            .format(len(csv_row), len(self._column_handlers)))
      column_idx = 0
      for csv_cell, handler in zip(csv_row, self._column_handlers):
        if handler is None:
          continue
        values_list_by_column[column_idx].append(
            handler(csv_cell) if csv_cell else None)
        column_idx += 1
      if self._raw_record_column_name is not None:
        raw_records.append([raw_record])

    arrow_arrays = [
        pa.array(l, type=t)
        for l, t in zip(values_list_by_column, self._column_arrow_types)
    ]

    if self._raw_record_column_name is not None:
      arrow_arrays.append(
          pa.array(raw_records, type=self._raw_record_column_type))
      self._column_names.append(self._raw_record_column_name)
    yield pa.RecordBatch.from_arrays(arrow_arrays, self._column_names)