public void report()

in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/AbstractReadManager.java [94:142]


  public void report(double permittedReadUnits, double consumedReadUnits, int items, int retries) {
    rateController.adjust(permittedReadUnits, consumedReadUnits, items);

    boolean addWorker = false;
    boolean removeWorker = false;

    synchronized (reportStatsLock) {
      reportedStats.add(new Report(consumedReadUnits, items, retries));

      long deltaMs = time.getTimeSinceMs(lastEvaluatedTimeNano);
      if (deltaMs < EVALUATION_FREQ_MS) {
        return;
      }
      int reportCount = reportedStats.size();
      if (reportCount == 0) {
        return;
      }

      // Compute statistics
      Report sum = getReportedSum();
      double rcuPerRequest = sum.readUnits / reportCount;
      double rcuPerSecond = (sum.readUnits * 1000) / deltaMs;
      recordEvaluationStats(reportCount, rcuPerRequest, rcuPerSecond);

      // Remove a worker if we're achieving our throughput with very low
      // iops requests. There's benefit in doing slightly larger requests.
      if (rcuPerRequest < MIN_RCU_PER_REQ && rcuPerSecond * 1.1 > rateController.getTargetRate()) {
        removeWorker = true;
      } else if (rcuPerSecond * 1.1 <= rateController.getTargetRate()) {
        if (sum.retries > 0) {
          log.warn("Not achieving throughput, but not adding workers due to retries (throttles or"
              + " 500s) (cnt=" + sum.retries + ")");
          // Add a worker if we're not achieving our throughput and getting no throttles/retries.
        } else {
          addWorker = true;
        }
      }
      reportedStats.clear();
      lastEvaluatedTimeNano = time.getNanoTime();
    }

    if (removeWorker) {
      log.info("Removing a worker");
      removeWorker();
    } else if (addWorker) {
      log.info("Adding a worker");
      addWorker();
    }
  }