private Callable congestionControlTask()

in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java [171:247]


    private Callable<Void> congestionControlTask() {
        return new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                while (isRunning()) {
                    try {

                        logger.debug("pki {} goes to sleep for {} seconds. availabel semaphore permits {}, current degree of parallelism {}",
                                partitionKeyRangeId, samplePeriod.getSeconds(), throttleSemaphore.availablePermits(), degreeOfConcurrency);
                        Thread.sleep(samplePeriod.toMillis());
                        logger.debug("pki {} wakes up", partitionKeyRangeId);

                        InsertMetrics insertMetricsSample = atomicGetAndReplace(new InsertMetrics());

                        if (insertMetricsSample.numberOfThrottles > 0) {
                            logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}",
                                    partitionKeyRangeId, insertMetricsSample.numberOfThrottles, degreeOfConcurrency, degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR);

                            // We got a throttle so we need to back off on the degree of concurrency.
                            // Get the current degree of concurrency and decrease that (AIMD).

                            for (int i = 0; i < degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR; i++) {
                                throttleSemaphore.acquire();
                            }

                            degreeOfConcurrency -= (degreeOfConcurrency / DIVISIVE_DECREASE_FACTOR);

                            logger.debug("pki {} degree of parallelism reduced to {}, sem available permits", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
                        }

                        if (insertMetricsSample.numberOfDocumentsInserted == 0) {
                            // We haven't made any progress, since the last sampling
                            continue;
                        }

                        logger.debug("pki {} aggregating inserts metrics", partitionKeyRangeId);

                        if (insertMetricsSample.numberOfThrottles == 0) {
                            if ((insertMetricsSample.requestUnitsConsumed < THROUGHPUT_THRESHOLD * partitionThroughput) &&
                                    degreeOfConcurrency + ADDITIVE_INCREASE_FACTOR <= MAX_DEGREE_OF_CONCURRENCY) {
                                // We aren't getting throttles, so we should bump of the degree of concurrency (AIMD).
                                logger.debug("pki {} increasing degree of prallelism and releasing semaphore", partitionKeyRangeId);

                                throttleSemaphore.release(ADDITIVE_INCREASE_FACTOR);
                                degreeOfConcurrency += ADDITIVE_INCREASE_FACTOR;

                                logger.debug("pki {} degree of parallelism increased to {}. available semaphore permits {}", partitionKeyRangeId, degreeOfConcurrency, throttleSemaphore.availablePermits());
                            }
                        }

                        double ruPerSecond = insertMetricsSample.requestUnitsConsumed / samplePeriod.getSeconds();
                        documentsInsertedSoFar += insertMetricsSample.numberOfDocumentsInserted;

                        logger.debug("pki {} : Inserted {} docs in {} milli seconds at {} RU/s with {} tasks."
                                + " Faced {} throttles. Total documents inserterd so far {}.",
                                partitionKeyRangeId,
                                insertMetricsSample.numberOfDocumentsInserted,
                                samplePeriod.toMillis(),
                                ruPerSecond,
                                degreeOfConcurrency,
                                insertMetricsSample.numberOfThrottles,
                                documentsInsertedSoFar);

                    } catch (InterruptedException e) {
                        logger.warn("Interrupted", e);
                        break;
                    } catch (Exception e) {
                        logger.error("pki {} unexpected failure", partitionKeyRangeId, e);
                        throw e;
                    }

                }
                return null;
            };
        };
    }