private Optional pollFileInternal()

in iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java [739:928]


  private Optional<SubscriptionMessage> pollFileInternal(
      final SubscriptionCommitContext commitContext,
      final String rawFileName,
      final File file,
      final RandomAccessFile fileWriter,
      final PollTimer timer)
      throws IOException, SubscriptionException {
    long writingOffset = fileWriter.length();

    LOGGER.info(
        "{} start to poll file {} with commit context {} at offset {}",
        this,
        file.getAbsolutePath(),
        commitContext,
        writingOffset);

    fileWriter.seek(writingOffset);
    while (true) {
      timer.update();
      if (timer.isExpired(TIMER_DELTA_MS)) {
        // resume from breakpoint if timeout happened when polling files
        inFlightFilesCommitContextSet.add(commitContext);
        final String message =
            String.format(
                "Timeout occurred when SubscriptionConsumer %s polling file %s with commit context %s, record writing offset %s for subsequent poll",
                this, file.getAbsolutePath(), commitContext, writingOffset);
        LOGGER.info(message);
        throw new SubscriptionRuntimeNonCriticalException(message);
      }

      final List<SubscriptionPollResponse> responses =
          pollFileInternal(commitContext, writingOffset, timer.remainingMs());

      // If responses is empty, it means that some outdated subscription events may be being polled,
      // so just return.
      if (responses.isEmpty()) {
        return Optional.empty();
      }

      // only one SubscriptionEvent polled currently
      final SubscriptionPollResponse response = responses.get(0);
      final SubscriptionPollPayload payload = response.getPayload();
      final short responseType = response.getResponseType();
      if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
        final String errorMessage = String.format("unexpected response type: %s", responseType);
        LOGGER.warn(errorMessage);
        throw new SubscriptionRuntimeNonCriticalException(errorMessage);
      }

      switch (SubscriptionPollResponseType.valueOf(responseType)) {
        case FILE_PIECE:
          {
            // check commit context
            final SubscriptionCommitContext incomingCommitContext = response.getCommitContext();
            if (Objects.isNull(incomingCommitContext)
                || !Objects.equals(commitContext, incomingCommitContext)) {
              final String errorMessage =
                  String.format(
                      "inconsistent commit context, current is %s, incoming is %s, consumer: %s",
                      commitContext, incomingCommitContext, this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // check file name
            if (!Objects.equals(rawFileName, ((FilePiecePayload) payload).getFileName())) {
              final String errorMessage =
                  String.format(
                      "inconsistent file name, current is %s, incoming is %s, consumer: %s",
                      rawFileName, ((FilePiecePayload) payload).getFileName(), this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // write file piece
            fileWriter.write(((FilePiecePayload) payload).getFilePiece());
            if (fileSaveFsync) {
              fileWriter.getFD().sync();
            }

            // check offset
            if (!Objects.equals(
                fileWriter.length(), ((FilePiecePayload) payload).getNextWritingOffset())) {
              final String errorMessage =
                  String.format(
                      "inconsistent file offset, current is %s, incoming is %s, consumer: %s",
                      fileWriter.length(),
                      ((FilePiecePayload) payload).getNextWritingOffset(),
                      this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // update offset
            writingOffset = ((FilePiecePayload) payload).getNextWritingOffset();
            break;
          }
        case FILE_SEAL:
          {
            // check commit context
            final SubscriptionCommitContext incomingCommitContext = response.getCommitContext();
            if (Objects.isNull(incomingCommitContext)
                || !Objects.equals(commitContext, incomingCommitContext)) {
              final String errorMessage =
                  String.format(
                      "inconsistent commit context, current is %s, incoming is %s, consumer: %s",
                      commitContext, incomingCommitContext, this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // check file name
            if (!Objects.equals(rawFileName, ((FileSealPayload) payload).getFileName())) {
              final String errorMessage =
                  String.format(
                      "inconsistent file name, current is %s, incoming is %s, consumer: %s",
                      rawFileName, ((FileSealPayload) payload).getFileName(), this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // check file length
            if (fileWriter.length() != ((FileSealPayload) payload).getFileLength()) {
              final String errorMessage =
                  String.format(
                      "inconsistent file length, current is %s, incoming is %s, consumer: %s",
                      fileWriter.length(), ((FileSealPayload) payload).getFileLength(), this);
              LOGGER.warn(errorMessage);
              throw new SubscriptionRuntimeNonCriticalException(errorMessage);
            }

            // optional sync and close
            if (fileSaveFsync) {
              fileWriter.getFD().sync();
            }
            fileWriter.close();

            LOGGER.info(
                "SubscriptionConsumer {} successfully poll file {} with commit context {}",
                this,
                file.getAbsolutePath(),
                commitContext);

            // generate subscription message
            inFlightFilesCommitContextSet.remove(commitContext);
            return Optional.of(
                new SubscriptionMessage(
                    commitContext,
                    file.getAbsolutePath(),
                    ((FileSealPayload) payload).getDatabaseName()));
          }
        case ERROR:
          {
            // no need to check commit context

            final String errorMessage = ((ErrorPayload) payload).getErrorMessage();
            final boolean critical = ((ErrorPayload) payload).isCritical();
            if (!critical
                && Objects.nonNull(errorMessage)
                && errorMessage.contains(SubscriptionTimeoutException.KEYWORD)) {
              // resume from breakpoint if timeout happened when polling files
              inFlightFilesCommitContextSet.add(commitContext);
              final String message =
                  String.format(
                      "Timeout occurred when SubscriptionConsumer %s polling file %s with commit context %s, record writing offset %s for subsequent poll",
                      this, file.getAbsolutePath(), commitContext, writingOffset);
              LOGGER.info(message);
              throw new SubscriptionPollTimeoutException(message);
            } else {
              LOGGER.warn(
                  "Error occurred when SubscriptionConsumer {} polling file {} with commit context {}: {}, critical: {}",
                  this,
                  file.getAbsolutePath(),
                  commitContext,
                  errorMessage,
                  critical);
              if (critical) {
                throw new SubscriptionRuntimeCriticalException(errorMessage);
              } else {
                throw new SubscriptionRuntimeNonCriticalException(errorMessage);
              }
            }
          }
        default:
          final String errorMessage = String.format("unexpected response type: %s", responseType);
          LOGGER.warn(errorMessage);
          throw new SubscriptionRuntimeNonCriticalException(errorMessage);
      }
    }
  }