in src/main/java/com/aliyun/odps/kafka/connect/converter/CsvRecordConverter.java [77:104]
public void convert(SinkRecord in, Record out) throws IOException {
out.setString(TOPIC, in.topic());
out.setBigint(PARTITION, in.kafkaPartition().longValue());
out.setBigint(OFFSET, in.kafkaOffset());
String data;
if (RecordConverterBuilder.Mode.KEY.equals(mode)) {
data = (String) in.key();
} else if (RecordConverterBuilder.Mode.VALUE.equals(mode)) {
data = (String) in.value();
} else {
throw new RuntimeException("Unsupported mode for CsvConverter: " + mode);
}
String[] row = load(data);
if (out.getColumnCount() - 3 != row.length) {
throw new RuntimeException("Column count doesn't match: " + data);
}
for (int i = 0; i < row.length; ++i) {
try {
// Can be cast to an array record. See TableTunnel.UploadSession.newRecord().
ConverterHelper.setRecordByType((ArrayRecord) out, userColIndex.get(i), row[i]);
} catch (Exception e) {
throw new IOException("Parse Error while trans value", e);
}
}
}