in src/main/java/com/aliyun/odps/kafka/connect/converter/JsonRecordConverter.java [26:45]
public void convert(SinkRecord in, Record out) throws IOException {
out.setString(TOPIC, in.topic());
out.setBigint(PARTITION, in.kafkaPartition().longValue());
out.setBigint(OFFSET, in.kafkaOffset());
switch (mode) {
case KEY:
if (in.key() != null) {
out.set(KEY, convertToJson(in.key()));
}
break;
case VALUE:
if (in.value() != null) {
out.set(VALUE, convertToJson(in.value()));
}
break;
case DEFAULT:
default:
throw new RuntimeException("Unsupported mode for jsonConverter:" + mode);
}
}