in src/main/java/com/aliyun/odps/kafka/connect/converter/JsonRecordConverter.java [48:67]
public static SimpleJsonValue convertToJson(Object jsonObject) throws JSONException, IOException {
// convert sinkRecord object to odps JsonSimple
if (jsonObject instanceof String) {
return new SimpleJsonValue((String) jsonObject);
}
// 提取payload 字段
if (jsonObject instanceof HashMap) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return new SimpleJsonValue(objectMapper.writeValueAsString(jsonObject));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (jsonObject instanceof Struct) {
Struct newJsonObject = (Struct) jsonObject;
return new SimpleJsonValue(extractPayLoad(newJsonObject.schema(), newJsonObject, false));
}
throw new RuntimeException("no recognize type of jsonObject");
}