in stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java [564:649]
private boolean skipTo(Long txId, DLSN dlsn, boolean skipControl) throws IOException {
boolean found = false;
while (true) {
try {
long flags;
long currTxId;
// if there is not record set, read next record
if (null == recordSetReader) {
in.markReaderIndex();
flags = in.readLong();
currTxId = in.readLong();
} else {
// check record set until reach end of record set
lastRecordSkipTo = recordSetReader.nextRecord();
if (null == lastRecordSkipTo) {
// reach end of record set
recordSetReader = null;
continue;
}
flags = lastRecordSkipTo.getMetadata();
currTxId = lastRecordSkipTo.getTransactionId();
}
if ((null != dlsn) && (recordStream.getCurrentPosition().compareTo(dlsn) >= 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found position {} beyond {}", recordStream.getCurrentPosition(), dlsn);
}
if (null == lastRecordSkipTo) {
in.resetReaderIndex();
}
found = true;
break;
}
if ((null != txId) && (currTxId >= txId)) {
if (!skipControl || !isControl(flags)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found position {} beyond {}", currTxId, txId);
}
if (null == lastRecordSkipTo) {
in.resetReaderIndex();
}
found = true;
break;
}
}
if (null != lastRecordSkipTo) {
recordStream.advance(1);
continue;
}
// get the num of records to skip
if (isRecordSet(flags)) {
// read record set
LogRecordWithDLSN record =
new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId);
record.setMetadata(flags);
record.setTransactionId(currTxId);
record.readPayload(in, false);
recordSetReader = LogRecordSet.of(record);
} else {
int length = in.readInt();
if (length < 0) {
// We should never really see this as we only write complete entries to
// BK and BK client has logic to detect torn writes (through checksum)
LOG.info("Encountered Record with negative length at TxId: {}", currTxId);
break;
}
// skip single record
in.skipBytes(length);
if (LOG.isTraceEnabled()) {
LOG.trace("Skipped Record with TxId {} DLSN {}",
currTxId, recordStream.getCurrentPosition());
}
recordStream.advance(1);
}
} catch (EOFException eof) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip encountered end of file Exception", eof);
}
break;
}
}
return found;
}