in hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ldb/DBScanner.java [296:379]
private void processRecords(ManagedRocksIterator iterator,
DBColumnFamilyDefinition dbColumnFamilyDef,
LogWriter logWriter, ExecutorService threadPool,
boolean schemaV3) throws InterruptedException, IOException {
if (startKey != null) {
iterator.get().seek(getValueObject(dbColumnFamilyDef, startKey));
}
ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
// Used to ensure that the output of a multi-threaded parsed Json is in
// the same order as the RocksDB iterator.
long sequenceId = FIRST_SEQUENCE_ID;
// Count number of keys printed so far
long count = 0;
List<Future<Void>> futures = new ArrayList<>();
boolean reachedEnd = false;
Map<String, Filter> fieldsFilterSplitMap = new HashMap<>();
if (filter != null) {
for (String field : filter.split(",")) {
String[] fieldValue = field.split(":");
if (fieldValue.length != 3) {
err().println("Error: Invalid format for filter \"" + field
+ "\". Usage: <field>:<operator>:<value>. Ignoring filter passed");
} else {
Filter filterValue = new Filter(fieldValue[1], fieldValue[2]);
if (filterValue.getOperator() == null) {
err().println("Error: Invalid operator for filter \"" + filterValue
+ "\". <operator> can be one of [EQUALS,LESSER,GREATER]. Ignoring filter passed");
} else {
String[] subfields = fieldValue[0].split("\\.");
getFilterSplit(Arrays.asList(subfields), fieldsFilterSplitMap, filterValue);
}
}
}
}
while (withinLimit(globalCount) && iterator.get().isValid() && !exception && !reachedEnd) {
// if invalid endKey is given, it is ignored
if (null != endKey && Arrays.equals(iterator.get().key(), getValueObject(dbColumnFamilyDef, endKey))) {
reachedEnd = true;
}
Object o = dbColumnFamilyDef.getValueCodec().fromPersistedFormat(iterator.get().value());
if (filter == null ||
checkFilteredObject(o, dbColumnFamilyDef.getValueType(), fieldsFilterSplitMap)) {
// the record passes the filter
batch.add(new ByteArrayKeyValue(
iterator.get().key(), iterator.get().value()));
globalCount++;
count++;
if (batch.size() >= batchSize) {
while (logWriter.getInflightLogCount() > threadCount * 10L
&& !exception) {
// Prevents too many unfinished Tasks from
// consuming too much memory.
Thread.sleep(100);
}
Future<Void> future = threadPool.submit(
new Task(dbColumnFamilyDef, batch, logWriter, sequenceId,
withKey, schemaV3, fieldsFilter));
futures.add(future);
batch = new ArrayList<>(batchSize);
sequenceId++;
}
}
iterator.get().next();
if ((recordsPerFile > 0) && (count >= recordsPerFile)) {
break;
}
}
if (!batch.isEmpty()) {
Future<Void> future = threadPool.submit(new Task(dbColumnFamilyDef,
batch, logWriter, sequenceId, withKey, schemaV3, fieldsFilter));
futures.add(future);
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Task execution failed", e);
}
}
}