in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java [171:247]
private Callable<Void> congestionControlTask() {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
while (isRunning()) {
try {
logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}",
partitionKeyRangeId, samplePeriod.getSeconds(), throttleSemaphore.availablePermits(), degreeOfConcurrency);
Thread.sleep(samplePeriod.toMillis());
logger.debug("pki {} wakes up", partitionKeyRangeId);
InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());
if (insertMetricsSample.numberOfThrottles > 0) {
logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}",
partitionKeyRangeId, insertMetricsSample.numberOfThrottles, degreeOfConcurrency, degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR);
// We got a throttle so we need to back off on the degree of concurrency.
// Get the current degree of concurrency and decrease that (AIMD).
for (int i = 0; i < degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR; i++) {
throttleSemaphore.acquire();
}
degreeOfConcurrency -= (degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR);
logger.debug("pki {} degree of parallelism reduced to {}, sem available permits", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
}
if (insertMetricsSample.numberOfDocumentsInserted == 0) {
// We haven't made any progress, since the last sampling
continue;
}
logger.debug("pki {} aggregating inserts metrics", partitionKeyRangeId);
if (insertMetricsSample.numberOfThrottles == 0) {
if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) &&
degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) {
// We aren't getting throttles, so we should bump of the degree of concurrency (AIMD).
logger.debug("pki {} increasing degree of prallelism and releasing semaphore", partitionKeyRangeId);
throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR);
degreeOfConcurrency += ADDITIVE_INCREASE_FACTOR;
logger.debug("pki {} degree of parallelism increased to {}. available semaphore permits {}", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
}
}
double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds();
documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted;
logger.debug("pki {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks."
+ " Faced {} throttles. Total documents inserterd so far {}.",
partitionKeyRangeId,
insertMetricsSample.numberOfDocumentsInserted,
samplePeriod.toMillis(),
ruPerSecond,
degreeOfConcurrency,
insertMetricsSample.numberOfThrottles,
documentsInsertedSoFar);
} catch (InterruptedException e) {
logger.warn("Interrupted", e);
break;
} catch (Exception e) {
logger.error("pki {} unexpected failure", partitionKeyRangeId, e);
throw e;
}
}
return null;
};
};
}