public void convert()

in src/main/java/com/aliyun/odps/kafka/connect/converter/CsvRecordConverter.java [77:104]


  public void convert(SinkRecord in, Record out) throws IOException {
    out.setString(TOPIC, in.topic());
    out.setBigint(PARTITION, in.kafkaPartition().longValue());
    out.setBigint(OFFSET, in.kafkaOffset());

    String data;
    if (RecordConverterBuilder.Mode.KEY.equals(mode)) {
      data = (String) in.key();
    } else if (RecordConverterBuilder.Mode.VALUE.equals(mode)) {
      data = (String) in.value();
    } else {
      throw new RuntimeException("Unsupported mode for CsvConverter: " + mode);
    }

    String[] row = load(data);
    if (out.getColumnCount() - 3 != row.length) {
      throw new RuntimeException("Column count doesn't match: " + data);
    }

    for (int i = 0; i < row.length; ++i) {
      try {
        // Can be cast to an array record. See TableTunnel.UploadSession.newRecord().
        ConverterHelper.setRecordByType((ArrayRecord) out, userColIndex.get(i), row[i]);
      } catch (Exception e) {
        throw new IOException("Parse Error while trans value", e);
      }
    }
  }