private void processRecords()

in hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ldb/DBScanner.java [296:379]


  private void processRecords(ManagedRocksIterator iterator,
                              DBColumnFamilyDefinition dbColumnFamilyDef,
                              LogWriter logWriter, ExecutorService threadPool,
                              boolean schemaV3) throws InterruptedException, IOException {
    if (startKey != null) {
      iterator.get().seek(getValueObject(dbColumnFamilyDef, startKey));
    }
    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
    // Used to ensure that the output of a multi-threaded parsed Json is in
    // the same order as the RocksDB iterator.
    long sequenceId = FIRST_SEQUENCE_ID;
    // Count number of keys printed so far
    long count = 0;
    List<Future<Void>> futures = new ArrayList<>();
    boolean reachedEnd = false;

    Map<String, Filter> fieldsFilterSplitMap = new HashMap<>();
    if (filter != null) {
      for (String field : filter.split(",")) {
        String[] fieldValue = field.split(":");
        if (fieldValue.length != 3) {
          err().println("Error: Invalid format for filter \"" + field
              + "\". Usage: <field>:<operator>:<value>. Ignoring filter passed");
        } else {
          Filter filterValue = new Filter(fieldValue[1], fieldValue[2]);
          if (filterValue.getOperator() == null) {
            err().println("Error: Invalid operator for filter \"" + filterValue
                + "\". <operator> can be one of [EQUALS,LESSER,GREATER]. Ignoring filter passed");
          } else {
            String[] subfields = fieldValue[0].split("\\.");
            getFilterSplit(Arrays.asList(subfields), fieldsFilterSplitMap, filterValue);
          }
        }
      }
    }

    while (withinLimit(globalCount) && iterator.get().isValid() && !exception && !reachedEnd) {
      // if invalid endKey is given, it is ignored
      if (null != endKey && Arrays.equals(iterator.get().key(), getValueObject(dbColumnFamilyDef, endKey))) {
        reachedEnd = true;
      }

      Object o = dbColumnFamilyDef.getValueCodec().fromPersistedFormat(iterator.get().value());
      if (filter == null ||
          checkFilteredObject(o, dbColumnFamilyDef.getValueType(), fieldsFilterSplitMap)) {
        // the record passes the filter
        batch.add(new ByteArrayKeyValue(
            iterator.get().key(), iterator.get().value()));
        globalCount++;
        count++;
        if (batch.size() >= batchSize) {
          while (logWriter.getInflightLogCount() > threadCount * 10L
              && !exception) {
            // Prevents too many unfinished Tasks from
            // consuming too much memory.
            Thread.sleep(100);
          }
          Future<Void> future = threadPool.submit(
              new Task(dbColumnFamilyDef, batch, logWriter, sequenceId,
                  withKey, schemaV3, fieldsFilter));
          futures.add(future);
          batch = new ArrayList<>(batchSize);
          sequenceId++;
        }
      }
      iterator.get().next();
      if ((recordsPerFile > 0) && (count >= recordsPerFile)) {
        break;
      }
    }
    if (!batch.isEmpty()) {
      Future<Void> future = threadPool.submit(new Task(dbColumnFamilyDef,
          batch, logWriter, sequenceId, withKey, schemaV3, fieldsFilter));
      futures.add(future);
    }

    for (Future<Void> future : futures) {
      try {
        future.get();
      } catch (ExecutionException e) {
        LOG.error("Task execution failed", e);
      }
    }
  }