in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java [398:424]
private void checkAndPropagateAsyncError() throws Exception {
if (thrownException != null) {
String errorMessages = "";
if (thrownException instanceof UserRecordFailedException) {
List<Attempt> attempts =
((UserRecordFailedException) thrownException).getResult().getAttempts();
for (Attempt attempt : attempts) {
if (attempt.getErrorMessage() != null) {
errorMessages += attempt.getErrorMessage() + "\n";
}
}
}
if (failOnError) {
throw new RuntimeException(
"An exception was thrown while processing a record: " + errorMessages,
thrownException);
} else {
LOG.warn(
"An exception was thrown while processing a record: {}.",
errorMessages,
thrownException);
// reset, prevent double throwing
thrownException = null;
}
}
}