in storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java [151:260]
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;
@Override
public Long call() throws Exception {
updateExecCredsIfRequired();
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
}
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();
if (!isActive) {
inactiveExecute();
return 0L;
}
if (!lastActive.get()) {
lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;
if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
spouts.get(j).nextTuple();
}
noEmits = (currCount == emittedCount.get());
if (noEmits) {
emptyEmitStreak.increment();
} else {
emptyStretch = emptyEmitStreak.get();
emptyEmitStreak.set(0);
}
}
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug("Reached max spout pending");
}
rmspCount++;
} else {
if (rmspCount > 0) {
LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
}
rmspCount = 0;
}
if (receiveCount > 1) {
// continue without idling
return 0L;
}
if (!pendingEmits.isEmpty()) { // then facing backpressure
backPressureWaitStrategy();
return 0L;
}
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
}
swIdleCount = 0;
return 0L;
}
private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
skippedBackpressureMs.inc(Time.currentTimeMillis() - start);
}
private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
emptyEmitStreak.increment();
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
skippedMaxSpoutMs.inc(Time.currentTimeMillis() - start);
} else {
if (emptyStretch > 0) {
LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
}
}
}
// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else { // to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}
};
}