private Stage createReadRecordsStage()

in tablestore/src/main/java/com/alicloud/openservices/tablestore/tunnel/pipeline/ProcessDataPipeline.java [106:163]


    private Stage<ReadRecordsRequest, ProcessRecordsInput> createReadRecordsStage() {
        return new AbstractStage<ReadRecordsRequest, ProcessRecordsInput>() {
            @Override
            public ProcessRecordsInput doProcess(ReadRecordsRequest readRecordsRequest) throws StageException {
                if (connect.getStatus() == ChannelConnectStatus.RUNNING) {
                    if (connect.getToken() != null && !(FINISH_TAG).equals(connect.getToken())) {
                        try {
                            LOG.debug("Begin read records, connect: {}", connect);
                            long beginTs = System.currentTimeMillis();
                            ReadRecordsResponse resp = null;
                            List<StreamRecord> totalRecords = new LinkedList<StreamRecord>();
                            int totalBytes = 0, times = 0, totalRecordsCount = 0;
                            while (totalBytes < readMaxBytesPerRound && times < readMaxTimesPerRound) {
                                resp = connect.getClient().readRecords(readRecordsRequest);
                                totalRecords.addAll(resp.getRecords());
                                totalRecordsCount += resp.getRecords().size();
                                totalBytes += resp.getMemoizedSerializedSize();
                                times++;

                                if (resp.getNextToken() == null || FINISH_TAG.equals(resp.getNextToken())) {
                                    LOG.info("Channel {} next token is null", connect.getChannelId());
                                    break;
                                }
                                if (backoff != null) {
                                    if (needResetBackoff(resp.getRecords().size(), resp.getMemoizedSerializedSize(), resp.getMayMoreRecord())) {
                                        LOG.debug("Backoff is reset");
                                        backoff.reset();
                                    } else {
                                        long sleepMills = backoff.nextBackOffMillis();
                                        LOG.debug("Data is not full, sleep {} msec.", sleepMills);
                                        Thread.sleep(backoff.nextBackOffMillis());
                                        break;
                                    }
                                }
                                readRecordsRequest.setToken(resp.getNextToken());
                            }
                            if (resp == null) {
                                LOG.info("ReadRecordsResponse is null, channelId: {}", connect.getChannelId());
                                return new ProcessRecordsInput(totalRecords, null, null);
                            } else {
                                LOG.info("GetRecords, Num: {}, LoopTimes: {}, TotalBytes: {}, Channel connect: {}, Latency: {} ms, Next Token: {}",
                                        totalRecordsCount, times, totalBytes, connect, System.currentTimeMillis() - beginTs, resp.getNextToken());
                                return new ProcessRecordsInput(totalRecords, resp.getNextToken(), resp.getRequestId(), connect.getChannelId());
                            }
                        } catch (Exception e) {
                            throw new StageException(this, readRecordsRequest, e.getMessage(), e);
                        }
                    } else {
                        LOG.info("Channel is finished, channel will be closed.");
                        connect.close(true);
                        throw new StageException(this, readRecordsRequest, "Channel connect is finished.");
                    }
                } else {
                    throw new StageException(this, readRecordsRequest, "Channel is not running.");
                }
            }
        };
    }