storm-client/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java [47:76]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        factory = new ComboList.Factory(sizes);
    }

    @Override
    public JoinState init(TridentCollector collector, TridentTuple group) {
        return new JoinState(types.size(), group);
    }

    @Override
    public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
        //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
        //TODO: only do cross join if at least one tuple in each side
        List<List> side = state.sides[streamIndex];
        if (side.isEmpty()) {
            state.numSidesReceived++;
        }

        side.add(input);
        if (state.numSidesReceived == state.sides.length) {
            emitCrossJoin(state, collector, streamIndex, input);
        }
    }

    @Override
    public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
        List<List>[] sides = state.sides;
        boolean wasEmpty = state.numSidesReceived < sides.length;
        for (int i = 0; i < sides.length; i++) {
            if (sides[i].isEmpty() && types.get(i) == JoinType.OUTER) {
                state.numSidesReceived++;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



storm-client/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java [52:81]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        factory = new ComboList.Factory(sizes);
    }

    @Override
    public JoinState init(TridentCollector collector, TridentTuple group) {
        return new JoinState(types.size(), group);
    }

    @Override
    public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
        //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
        //TODO: only do cross join if at least one tuple in each side
        List<List> side = state.sides[streamIndex];
        if (side.isEmpty()) {
            state.numSidesReceived++;
        }

        side.add(input);
        if (state.numSidesReceived == state.sides.length) {
            emitCrossJoin(state, collector, streamIndex, input);
        }
    }

    @Override
    public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
        List<List>[] sides = state.sides;
        boolean wasEmpty = state.numSidesReceived < sides.length;
        for (int i = 0; i < sides.length; i++) {
            if (sides[i].isEmpty() && types.get(i) == JoinType.OUTER) {
                state.numSidesReceived++;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



