in src/main/java/com/aliyun/odps/kafka/connect/converter/FlattenRecordConverter.java [45:78]
public void convert(SinkRecord in, Record out) throws JsonProcessingException {
out.setString(TOPIC, in.topic());
out.setBigint(PARTITION, in.kafkaPartition().longValue());
out.setBigint(OFFSET, in.kafkaOffset());
Map<String, Object> flattenRecord = new HashMap<>();
switch (mode) {
case KEY:
if (in.key() != null) {
flattenRecord = flattenFieldFromJson(in.key());
}
break;
case VALUE:
if (in.value() != null) {
flattenRecord = flattenFieldFromJson(in.value());
}
break;
case DEFAULT:
default:
throw new RuntimeException("Unsupported mode for FlattenConverter:" + mode);
}
for (String key : flattenRecord.keySet()) {
checkColumnExist(key);
try {
String values = flattenRecord.get(key) instanceof Map ?
JsonHandler.getJsonString(flattenRecord.get(key))
: flattenRecord.get(key).toString();
ConverterHelper.setRecordByType((ArrayRecord) out, lookID.get(key.toLowerCase()),
values);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}