in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/AbstractReadManager.java [94:142]
public void report(double permittedReadUnits, double consumedReadUnits, int items, int retries) {
rateController.adjust(permittedReadUnits, consumedReadUnits, items);
boolean addWorker = false;
boolean removeWorker = false;
synchronized (reportStatsLock) {
reportedStats.add(new Report(consumedReadUnits, items, retries));
long deltaMs = time.getTimeSinceMs(lastEvaluatedTimeNano);
if (deltaMs < EVALUATION_FREQ_MS) {
return;
}
int reportCount = reportedStats.size();
if (reportCount == 0) {
return;
}
// Compute statistics
Report sum = getReportedSum();
double rcuPerRequest = sum.readUnits / reportCount;
double rcuPerSecond = (sum.readUnits * 1000) / deltaMs;
recordEvaluationStats(reportCount, rcuPerRequest, rcuPerSecond);
// Remove a worker if we're achieving our throughput with very low
// iops requests. There's benefit in doing slightly larger requests.
if (rcuPerRequest < MIN_RCU_PER_REQ && rcuPerSecond * 1.1 > rateController.getTargetRate()) {
removeWorker = true;
} else if (rcuPerSecond * 1.1 <= rateController.getTargetRate()) {
if (sum.retries > 0) {
log.warn("Not achieving throughput, but not adding workers due to retries (throttles or"
+ " 500s) (cnt=" + sum.retries + ")");
// Add a worker if we're not achieving our throughput and getting no throttles/retries.
} else {
addWorker = true;
}
}
reportedStats.clear();
lastEvaluatedTimeNano = time.getNanoTime();
}
if (removeWorker) {
log.info("Removing a worker");
removeWorker();
} else if (addWorker) {
log.info("Adding a worker");
addWorker();
}
}