in modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java [82:188]
private void findTop() throws IOException {
outer: while (source.hasTop() && readLockKey == null) {
long invalidationTime = -1;
long dataPointer = -1;
if (source.getTopKey().getColumnFamilyData().equals(NOTIFY_CF_BS)) {
throw new IllegalStateException("seeing notifications during snapshot iteration");
}
curCol.set(source.getTopKey());
while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
switch (colType) {
case TX_DONE: {
source.skipToPrefix(curCol, ColumnType.WRITE);
continue;
}
case WRITE: {
long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
if (timePtr > invalidationTime) {
invalidationTime = timePtr;
}
if (dataPointer == -1) {
if (ts <= snaptime) {
dataPointer = timePtr;
source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
continue;
} else {
source.skipToTimestamp(curCol, ColumnType.WRITE.encode(snaptime));
continue;
}
}
break;
}
case DEL_LOCK: {
if (ts > invalidationTime) {
invalidationTime = ts;
}
if (returnReadLockPresent) {
source.skipToPrefix(curCol, ColumnType.RLOCK);
} else {
source.skipToPrefix(curCol, ColumnType.LOCK);
}
continue;
}
case RLOCK: {
if (returnReadLockPresent) {
rememberReadLock(source.getTopKey(), source.getTopValue());
}
source.skipToPrefix(curCol, ColumnType.LOCK);
continue;
}
case LOCK: {
if (ts > invalidationTime && ts <= snaptime) {
// nothing supersedes this lock, therefore the column is locked
return;
} else {
if (dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
} else {
source.skipToTimestamp(curCol, ColumnType.DATA.encode(dataPointer));
continue;
}
}
}
case DATA: {
if (dataPointer == ts) {
// found data for this column
return;
}
if (ts < dataPointer || dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
}
if (ts > dataPointer) {
source.skipToTimestamp(curCol, ColumnType.DATA.encode(dataPointer));
continue;
}
break;
}
case ACK: {
if (dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
} else {
source.skipToTimestamp(curCol, ColumnType.DATA.encode(dataPointer));
continue;
}
}
default:
throw new IllegalArgumentException();
}
// TODO handle case where dataPointer >=0, but no data was found
source.next();
}
}
}