in src/main/java/software/amazon/event/kafkaconnector/mapping/JsonPathDetailTypeMapper.java [41:108]
public String getDetailType(SinkRecord record) {
if (record == null) {
throw new IllegalArgumentException("SinkRecord is null. Unable to extract detail type.");
}
if (record.topic() == null || record.topic().trim().isEmpty()) {
throw new IllegalArgumentException(
"SinkRecord topic is null or empty but is required for fallback logic.");
}
var topic = record.topic();
try {
var jsonBytes = jsonConverter.fromConnectData(topic, record.valueSchema(), record.value());
// super defensive, because null here should never be the case
var jsonString = new String(jsonBytes).trim();
if (jsonBytes == null || jsonString.isEmpty()) {
log.error(
"Record value conversion to JSON bytes returned null or empty string for record '{}', using topic '{}' as"
+ " fallback",
record,
topic);
return topic;
}
var extractedValue = JsonPath.using(jsonPathConfiguration).parse(jsonString).read(jsonPath);
if (extractedValue == null) {
log.warn(
"Parsed JSON value is null for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}
if (!(extractedValue instanceof String)) {
log.warn(
"Parsed JSON value is not of type String for for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}
if (((String) extractedValue).trim().isEmpty()) {
log.warn(
"Parsed JSON value is empty String for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}
log.trace(
"Successfully extracted detail type '{}' for JSON path '{}' and record '{}'",
extractedValue,
jsonPath,
record);
return (String) extractedValue;
} catch (Exception e) {
log.error(
"Could not extract JSON value for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}
}