in tablestore/src/main/java/com/alicloud/openservices/tablestore/tunnel/pipeline/ProcessDataPipeline.java [106:163]
private Stage<ReadRecordsRequest, ProcessRecordsInput> createReadRecordsStage() {
return new AbstractStage<ReadRecordsRequest, ProcessRecordsInput>() {
@Override
public ProcessRecordsInput doProcess(ReadRecordsRequest readRecordsRequest) throws StageException {
if (connect.getStatus() == ChannelConnectStatus.RUNNING) {
if (connect.getToken() != null && !(FINISH_TAG).equals(connect.getToken())) {
try {
LOG.debug("Begin read records, connect: {}", connect);
long beginTs = System.currentTimeMillis();
ReadRecordsResponse resp = null;
List<StreamRecord> totalRecords = new LinkedList<StreamRecord>();
int totalBytes = 0, times = 0, totalRecordsCount = 0;
while (totalBytes < readMaxBytesPerRound && times < readMaxTimesPerRound) {
resp = connect.getClient().readRecords(readRecordsRequest);
totalRecords.addAll(resp.getRecords());
totalRecordsCount += resp.getRecords().size();
totalBytes += resp.getMemoizedSerializedSize();
times++;
if (resp.getNextToken() == null || FINISH_TAG.equals(resp.getNextToken())) {
LOG.info("Channel {} next token is null", connect.getChannelId());
break;
}
if (backoff != null) {
if (needResetBackoff(resp.getRecords().size(), resp.getMemoizedSerializedSize(), resp.getMayMoreRecord())) {
LOG.debug("Backoff is reset");
backoff.reset();
} else {
long sleepMills = backoff.nextBackOffMillis();
LOG.debug("Data is not full, sleep {} msec.", sleepMills);
Thread.sleep(backoff.nextBackOffMillis());
break;
}
}
readRecordsRequest.setToken(resp.getNextToken());
}
if (resp == null) {
LOG.info("ReadRecordsResponse is null, channelId: {}", connect.getChannelId());
return new ProcessRecordsInput(totalRecords, null, null);
} else {
LOG.info("GetRecords, Num: {}, LoopTimes: {}, TotalBytes: {}, Channel connect: {}, Latency: {} ms, Next Token: {}",
totalRecordsCount, times, totalBytes, connect, System.currentTimeMillis() - beginTs, resp.getNextToken());
return new ProcessRecordsInput(totalRecords, resp.getNextToken(), resp.getRequestId(), connect.getChannelId());
}
} catch (Exception e) {
throw new StageException(this, readRecordsRequest, e.getMessage(), e);
}
} else {
LOG.info("Channel is finished, channel will be closed.");
connect.close(true);
throw new StageException(this, readRecordsRequest, "Channel connect is finished.");
}
} else {
throw new StageException(this, readRecordsRequest, "Channel is not running.");
}
}
};
}