private boolean checkFinishId()

in storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java [106:167]


    private boolean checkFinishId(Tuple tup, TupleType type) {
        Object id = tup.getValue(0);
        boolean failed = false;

        synchronized (tracked) {
            TrackingInfo track = tracked.get(id);
            try {
                if (track != null) {
                    boolean delayed = false;
                    if (idStreamSpec == null && type == TupleType.COORD || idStreamSpec != null && type == TupleType.ID) {
                        track.ackTuples.add(tup);
                        delayed = true;
                    }
                    if (track.failed) {
                        failed = true;
                        for (Tuple t : track.ackTuples) {
                            collector.fail(t);
                        }
                        tracked.remove(id);
                    } else if (track.receivedId && (sourceArgs.isEmpty()
                            || track.reportCount == numSourceReports && track.expectedTupleCount == track.receivedTuples)) {
                        if (delegate instanceof FinishedCallback) {
                            ((FinishedCallback) delegate).finishedId(id);
                        }
                        if (!(sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
                        }
                        Iterator<Integer> outTasks = countOutTasks.iterator();
                        while (outTasks.hasNext()) {
                            int task = outTasks.next();
                            int numTuples = Utils.get(track.taskEmittedTuples, task, 0);
                            collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
                        }
                        for (Tuple t : track.ackTuples) {
                            collector.ack(t);
                        }
                        track.finished = true;
                        tracked.remove(id);
                    }
                    if (!delayed && type != TupleType.REGULAR) {
                        if (track.failed) {
                            collector.fail(tup);
                        } else {
                            collector.ack(tup);
                        }
                    }
                } else {
                    if (type != TupleType.REGULAR) {
                        collector.fail(tup);
                    }
                }
            } catch (FailedException e) {
                LOG.error("Failed to finish batch", e);
                for (Tuple t : track.ackTuples) {
                    collector.fail(t);
                }
                tracked.remove(id);
                failed = true;
            }
        }
        return failed;
    }