in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java [739:928]
private Optional<SubscriptionMessage> pollFileInternal(
final SubscriptionCommitContext commitContext,
final String rawFileName,
final File file,
final RandomAccessFile fileWriter,
final PollTimer timer)
throws IOException, SubscriptionException {
long writingOffset = fileWriter.length();
LOGGER.info(
"{} start to poll file {} with commit context {} at offset {}",
this,
file.getAbsolutePath(),
commitContext,
writingOffset);
fileWriter.seek(writingOffset);
while (true) {
timer.update();
if (timer.isExpired(TIMER_DELTA_MS)) {
// resume from breakpoint if timeout happened when polling files
inFlightFilesCommitContextSet.add(commitContext);
final String message =
String.format(
"Timeout occurred when SubscriptionConsumer %s polling file %s with commit context %s, record writing offset %s for subsequent poll",
this, file.getAbsolutePath(), commitContext, writingOffset);
LOGGER.info(message);
throw new SubscriptionRuntimeNonCriticalException(message);
}
final List<SubscriptionPollResponse> responses =
pollFileInternal(commitContext, writingOffset, timer.remainingMs());
// If responses is empty, it means that some outdated subscription events may be being polled,
// so just return.
if (responses.isEmpty()) {
return Optional.empty();
}
// only one SubscriptionEvent polled currently
final SubscriptionPollResponse response = responses.get(0);
final SubscriptionPollPayload payload = response.getPayload();
final short responseType = response.getResponseType();
if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
final String errorMessage = String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
switch (SubscriptionPollResponseType.valueOf(responseType)) {
case FILE_PIECE:
{
// check commit context
final SubscriptionCommitContext incomingCommitContext = response.getCommitContext();
if (Objects.isNull(incomingCommitContext)
|| !Objects.equals(commitContext, incomingCommitContext)) {
final String errorMessage =
String.format(
"inconsistent commit context, current is %s, incoming is %s, consumer: %s",
commitContext, incomingCommitContext, this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// check file name
if (!Objects.equals(rawFileName, ((FilePiecePayload) payload).getFileName())) {
final String errorMessage =
String.format(
"inconsistent file name, current is %s, incoming is %s, consumer: %s",
rawFileName, ((FilePiecePayload) payload).getFileName(), this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// write file piece
fileWriter.write(((FilePiecePayload) payload).getFilePiece());
if (fileSaveFsync) {
fileWriter.getFD().sync();
}
// check offset
if (!Objects.equals(
fileWriter.length(), ((FilePiecePayload) payload).getNextWritingOffset())) {
final String errorMessage =
String.format(
"inconsistent file offset, current is %s, incoming is %s, consumer: %s",
fileWriter.length(),
((FilePiecePayload) payload).getNextWritingOffset(),
this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// update offset
writingOffset = ((FilePiecePayload) payload).getNextWritingOffset();
break;
}
case FILE_SEAL:
{
// check commit context
final SubscriptionCommitContext incomingCommitContext = response.getCommitContext();
if (Objects.isNull(incomingCommitContext)
|| !Objects.equals(commitContext, incomingCommitContext)) {
final String errorMessage =
String.format(
"inconsistent commit context, current is %s, incoming is %s, consumer: %s",
commitContext, incomingCommitContext, this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// check file name
if (!Objects.equals(rawFileName, ((FileSealPayload) payload).getFileName())) {
final String errorMessage =
String.format(
"inconsistent file name, current is %s, incoming is %s, consumer: %s",
rawFileName, ((FileSealPayload) payload).getFileName(), this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// check file length
if (fileWriter.length() != ((FileSealPayload) payload).getFileLength()) {
final String errorMessage =
String.format(
"inconsistent file length, current is %s, incoming is %s, consumer: %s",
fileWriter.length(), ((FileSealPayload) payload).getFileLength(), this);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
// optional sync and close
if (fileSaveFsync) {
fileWriter.getFD().sync();
}
fileWriter.close();
LOGGER.info(
"SubscriptionConsumer {} successfully poll file {} with commit context {}",
this,
file.getAbsolutePath(),
commitContext);
// generate subscription message
inFlightFilesCommitContextSet.remove(commitContext);
return Optional.of(
new SubscriptionMessage(
commitContext,
file.getAbsolutePath(),
((FileSealPayload) payload).getDatabaseName()));
}
case ERROR:
{
// no need to check commit context
final String errorMessage = ((ErrorPayload) payload).getErrorMessage();
final boolean critical = ((ErrorPayload) payload).isCritical();
if (!critical
&& Objects.nonNull(errorMessage)
&& errorMessage.contains(SubscriptionTimeoutException.KEYWORD)) {
// resume from breakpoint if timeout happened when polling files
inFlightFilesCommitContextSet.add(commitContext);
final String message =
String.format(
"Timeout occurred when SubscriptionConsumer %s polling file %s with commit context %s, record writing offset %s for subsequent poll",
this, file.getAbsolutePath(), commitContext, writingOffset);
LOGGER.info(message);
throw new SubscriptionPollTimeoutException(message);
} else {
LOGGER.warn(
"Error occurred when SubscriptionConsumer {} polling file {} with commit context {}: {}, critical: {}",
this,
file.getAbsolutePath(),
commitContext,
errorMessage,
critical);
if (critical) {
throw new SubscriptionRuntimeCriticalException(errorMessage);
} else {
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
}
}
default:
final String errorMessage = String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
}
}