in modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java [152:321]
private void readColMetadata() throws IOException {
long invalidationTime = -1;
boolean oldestSeen = false;
boolean sawAck = false;
long firstWrite = -1;
long lastReadLockDeleteTs = -1;
truncationTime = -1;
position = 0;
keys.clear();
keysFiltered.clear();
completeTxs.clear();
rolledback.clear();
curCol.set(source.getTopKey());
if (NotificationUtil.isNtfy(source.getTopKey())) {
return;
}
loop: while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
switch (colType) {
case TX_DONE: {
keys.add(source.getTopKey(), source.getTopValue());
completeTxs.add(ts);
break;
}
case WRITE: {
boolean keep = false;
boolean complete = completeTxs.contains(ts);
byte[] val = source.getTopValue().get();
long timePtr = WriteValue.getTimestamp(val);
if (WriteValue.isPrimary(val) && !complete) {
keep = true;
}
if (!oldestSeen) {
if (firstWrite == -1) {
firstWrite = ts;
}
if (ts < gcTimestamp) {
oldestSeen = true;
truncationTime = timePtr;
if (!(WriteValue.isDelete(val) && isFullMajc)) {
keep = true;
}
} else {
keep = true;
}
}
if (timePtr > invalidationTime) {
invalidationTime = timePtr;
}
if (keep) {
keys.add(source.getTopKey(), val);
} else if (complete) {
completeTxs.remove(ts);
}
break;
}
case DEL_LOCK: {
boolean keep = false;
long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
boolean complete = completeTxs.contains(txDoneTs);
byte[] val = source.getTopValue().get();
if (!complete && DelLockValue.isPrimary(val)) {
keep = true;
}
if (DelLockValue.isRollback(val)) {
rolledback.add(ts);
keep |= !isFullMajc;
}
if (ts > invalidationTime) {
invalidationTime = ts;
}
if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
} else if (complete) {
completeTxs.remove(txDoneTs);
}
break;
}
case RLOCK: {
boolean keep = false;
long rlts = ReadLockUtil.decodeTs(ts);
boolean isDelete = ReadLockUtil.isDelete(ts);
if (isDelete) {
lastReadLockDeleteTs = rlts;
}
if (rlts > invalidationTime) {
if (isFullMajc) {
if (isDelete) {
if (DelReadLockValue.isRollback(source.getTopValue().get())) {
// can drop rolled back read lock delete markers on any full majc, do not need to
// consider gcTimestamp
keep = false;
} else {
long rlockCommitTs =
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
keep = rlockCommitTs >= gcTimestamp;
}
} else {
keep = lastReadLockDeleteTs != rlts;
}
} else {
// can drop deleted read lock entries.. keep the delete entry.
keep = isDelete || lastReadLockDeleteTs != rlts;
}
}
if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
}
break;
}
case LOCK: {
if (ts > invalidationTime) {
keys.add(source.getTopKey(), source.getTopValue());
}
break;
}
case DATA: {
// can stop looking
break loop;
}
case ACK: {
if (!sawAck) {
if (ts >= firstWrite) {
keys.add(source.getTopKey(), source.getTopValue());
}
sawAck = true;
}
break;
}
default:
throw new IllegalArgumentException(" unknown colType " + colType);
}
source.next();
}
keys.copyTo(keysFiltered, (timestamp -> {
if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
} else {
return true;
}
}));
}