in src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java [214:238]
public boolean shouldSkipRecord(SinkRecord record) {
if (record.value() == null) {
switch (dorisOptions.getBehaviorOnNullValues()) {
case FAIL:
throw new DataException(
String.format(
"Null valued record from topic %s, partition %s and offset %s was failed "
+ "(the configuration property '%s' is '%s').",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES,
BehaviorOnNullValues.FAIL));
case IGNORE:
default:
LOG.debug(
"Null valued record from topic '{}', partition {} and offset {} was skipped",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset());
return true;
}
}
return false;
}