in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/PipeReceiverStatusHandler.java [84:184]
public void handle(
final TSStatus status, final String exceptionMessage, final String recordMessage) {
switch (status.getCode()) {
case 200: // SUCCESS_STATUS
case 400: // REDIRECTION_RECOMMEND
{
return;
}
case 1809: // PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION
{
LOGGER.info("Idempotent conflict exception: will be ignored. status: {}", status);
return;
}
case 1808: // PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
{
LOGGER.info("Temporary unavailable exception: will retry forever. status: {}", status);
throw new PipeException(exceptionMessage);
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
if (!isRetryAllowedWhenConflictOccurs) {
LOGGER.warn(
"User conflict exception: will be ignored because retry is not allowed. event: {}. status: {}",
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded",
status);
return;
}
synchronized (this) {
recordExceptionStatusIfNecessary(recordMessage);
if (exceptionEventHasBeenRetried.get()
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
> retryMaxMillisWhenConflictOccurs) {
LOGGER.warn(
"User conflict exception: retry timeout. will be ignored. event: {}. status: {}",
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded",
status);
resetExceptionStatus();
return;
}
LOGGER.warn(
"User conflict exception: will retry {}. status: {}",
retryMaxMillisWhenConflictOccurs == Long.MAX_VALUE
? "forever"
: "for at least "
+ (retryMaxMillisWhenConflictOccurs
+ exceptionFirstEncounteredTime.get()
- System.currentTimeMillis())
/ 1000.0
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
throw new PipeException(
exceptionMessage,
(int)
Math.max(
5,
Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1)));
}
default: // Other exceptions
synchronized (this) {
recordExceptionStatusIfNecessary(recordMessage);
if (exceptionEventHasBeenRetried.get()
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
> retryMaxMillisWhenOtherExceptionsOccur) {
LOGGER.warn(
"Unclassified exception: retry timeout. will be ignored. event: {}. status: {}",
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
status);
resetExceptionStatus();
return;
}
LOGGER.warn(
"Unclassified exception: will retry {}. status: {}",
retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE
? "forever"
: "for at least "
+ (retryMaxMillisWhenOtherExceptionsOccur
+ exceptionFirstEncounteredTime.get()
- System.currentTimeMillis())
/ 1000.0
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
throw new PipeException(
exceptionMessage,
(int)
Math.max(
5,
Math.min(
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
}
}
}