in storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java [106:167]
private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;
synchronized (tracked) {
TrackingInfo track = tracked.get(id);
try {
if (track != null) {
boolean delayed = false;
if (idStreamSpec == null && type == TupleType.COORD || idStreamSpec != null && type == TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
if (track.failed) {
failed = true;
for (Tuple t : track.ackTuples) {
collector.fail(t);
}
tracked.remove(id);
} else if (track.receivedId && (sourceArgs.isEmpty()
|| track.reportCount == numSourceReports && track.expectedTupleCount == track.receivedTuples)) {
if (delegate instanceof FinishedCallback) {
((FinishedCallback) delegate).finishedId(id);
}
if (!(sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}
Iterator<Integer> outTasks = countOutTasks.iterator();
while (outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = Utils.get(track.taskEmittedTuples, task, 0);
collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}
for (Tuple t : track.ackTuples) {
collector.ack(t);
}
track.finished = true;
tracked.remove(id);
}
if (!delayed && type != TupleType.REGULAR) {
if (track.failed) {
collector.fail(tup);
} else {
collector.ack(tup);
}
}
} else {
if (type != TupleType.REGULAR) {
collector.fail(tup);
}
}
} catch (FailedException e) {
LOG.error("Failed to finish batch", e);
for (Tuple t : track.ackTuples) {
collector.fail(t);
}
tracked.remove(id);
failed = true;
}
}
return failed;
}