in src/main/java/org/apache/flink/connector/rocketmq/legacy/common/serialization/RowKeyValueDeserializationSchema.java [197:232]
private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
boolean skip = false;
switch (formatErrorStrategy) {
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
LOGGER.warn(
"Data format error, field type: "
+ fieldTypes[index]
+ "field data: "
+ data[index]
+ ", index: "
+ index
+ ", data: ["
+ StringUtils.join(data, ",")
+ "]",
e);
lastLogExceptionTime = now;
}
skip = true;
break;
case SKIP_SILENT:
skip = true;
break;
case EXCEPTION:
throw new RuntimeException(e);
case CUT:
case NULL:
case PAD:
default:
row.setField(index, null);
break;
}
return skip;
}