public void run()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java [572:721]


    public void run() {
        synchronized (scheduleLock) {
            if (scheduleDelayStopwatch.isRunning()) {
                scheduleLatency.registerSuccessfulEvent(
                    scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
            }

            Stopwatch runTime = Stopwatch.createStarted();
            int iterations = 0;
            long scheduleCountLocal = scheduleCountUpdater.get(this);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
            }
            while (true) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
                }

                PendingReadRequest nextRequest = null;
                synchronized (this) {
                    nextRequest = pendingRequests.peek();

                    // Queue is empty, nothing to read, return
                    if (null == nextRequest) {
                        LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
                        scheduleCountUpdater.set(this, 0);
                        backgroundReaderRunTime.registerSuccessfulEvent(
                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                        return;
                    }

                    if (disableProcessingReadRequests) {
                        LOG.info("Reader of {} is forced to stop processing read requests",
                            readHandler.getFullyQualifiedName());
                        return;
                    }
                }
                lastProcessTime.reset().start();

                // If the oldest pending promise is interrupted then we must mark
                // the reader in error and abort all pending reads since we dont
                // know the last consumed read
                if (null == lastExceptionUpdater.get(this)) {
                    if (nextRequest.getPromise().isCancelled()) {
                        setLastException(new DLInterruptedException("Interrupted on reading "
                            + readHandler.getFullyQualifiedName()));
                    }
                }

                if (checkClosedOrInError("readNext")) {
                    Throwable lastException = lastExceptionUpdater.get(this);
                    if (lastException != null && !(lastException.getCause() instanceof LogNotFoundException)) {
                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException);
                    }
                    backgroundReaderRunTime.registerFailedEvent(
                        runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                    return;
                }

                try {
                    // Fail 10% of the requests when asked to simulate errors
                    if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
                        throw new IOException("Reader Simulated Exception");
                    }
                    LogRecordWithDLSN record;
                    while (!nextRequest.hasReadEnoughRecords()) {
                        // read single record
                        do {
                            record = readNextRecord();
                        } while (null != record && (record.isControl()
                                || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
                        if (null == record) {
                            break;
                        } else {
                            if (record.isEndOfStream() && !returnEndOfStreamRecord) {
                                setLastException(new EndOfStreamException("End of Stream Reached for "
                                        + readHandler.getFullyQualifiedName()));
                                break;
                            }

                            // gap detection
                            if (recordPositionsContainsGap(record, lastPosition)) {
                                bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}",
                                        record);
                                if (positionGapDetectionEnabled) {
                                    throw new DLIllegalStateException("Gap detected between records at record = "
                                            + record);
                                }
                            }
                            lastPosition = record.getLastPositionWithinLogSegment();
                            nextRequest.addRecord(record);
                        }
                    }
                } catch (IOException exc) {
                    setLastException(exc);
                    if (!(exc instanceof LogNotFoundException)) {
                        LOG.warn("{} : read with skip Exception",
                                readHandler.getFullyQualifiedName(), lastExceptionUpdater.get(this));
                    }
                    continue;
                }

                if (nextRequest.hasReadRecords()) {
                    long remainingWaitTime = nextRequest.getRemainingWaitTime();
                    if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
                        backgroundReaderRunTime.registerSuccessfulEvent(
                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                        scheduleDelayStopwatch.reset().start();
                        scheduleCountUpdater.set(this, 0);
                        // the request could still wait for more records
                        backgroundScheduleTask = scheduler.scheduleOrdered(
                                streamName,
                                BACKGROUND_READ_SCHEDULER,
                                remainingWaitTime,
                                nextRequest.deadlineTimeUnit);
                        return;
                    }

                    PendingReadRequest request = pendingRequests.poll();
                    if (null != request && nextRequest == request) {
                        request.complete();
                        if (null != backgroundScheduleTask) {
                            backgroundScheduleTask.cancel(true);
                            backgroundScheduleTask = null;
                        }
                    } else {
                        DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = "
                                + nextRequest.records.get(0).getDlsn());
                        nextRequest.completeExceptionally(ise);
                        if (null != request) {
                            request.completeExceptionally(ise);
                        }
                        // We should never get here as we should have exited the loop if
                        // pendingRequests were empty
                        bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}",
                                nextRequest.records.get(0).getDlsn());
                        setLastException(ise);
                    }
                } else {
                    if (0 == scheduleCountLocal) {
                        LOG.trace("Schedule count dropping to zero", lastExceptionUpdater.get(this));
                        backgroundReaderRunTime.registerSuccessfulEvent(
                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                        return;
                    }
                    scheduleCountLocal = scheduleCountUpdater.decrementAndGet(this);
                }
            }
        }
    }