public List fire()

in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/JoinWindowFire.java [71:205]


    public List<WindowKey> fire(String operatorName, long watermark, StreamType streamType) {
        List<WindowKey> fired = new ArrayList<>();

        try {
            String leftWindow = Utils.buildKey(operatorName, StreamType.LEFT_STREAM.name());
            List<Pair<WindowKey, WindowState<K, V1>>> leftPairs = this.leftWindowStore.searchLessThanWatermark(leftWindow, watermark);
            if (leftPairs.size() != 0) {
                for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
                    logger.debug("search with key prefix:{} and watermark:{}, find window: {}", leftWindow, Utils.format(watermark), leftPair.getKey());
                }
            }

            String rightWindow = Utils.buildKey(operatorName, StreamType.RIGHT_STREAM.name());
            List<Pair<WindowKey, WindowState<K, V2>>> rightPairs = this.rightWindowStore.searchLessThanWatermark(rightWindow, watermark);
            if (rightPairs.size() != 0) {
                for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
                    logger.debug("search with key prefix:{} and watermark:{}, find window: {}", rightWindow, Utils.format(watermark), rightPair.getKey());
                }
            }

            if (leftPairs.size() == 0 && rightPairs.size() == 0) {
                logger.debug("left window and right window are all empty, watermark:{}." +
                        "left window operatorName:{}, right window operatorName:{}", Utils.format(watermark), leftWindow, rightWindow);
                return fired;
            }

            leftPairs.sort(Comparator.comparing(pair -> {
                WindowKey key = pair.getKey();
                return key.getWindowEnd();
            }));
            rightPairs.sort(Comparator.comparing(pair -> {
                WindowKey key = pair.getKey();
                return key.getWindowEnd();
            }));

            switch (joinType) {
                case INNER_JOIN:
                    //匹配上才触发
                    for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
                        WindowKey leftWindowKey = leftPair.getKey();
                        String leftPrefix = leftWindowKey.getKeyAndWindow();

                        for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
                            String rightPrefix = rightPair.getKey().getKeyAndWindow();

                            //相同window中相同key,聚合
                            if (leftPrefix.equals(rightPrefix)) {
                                //do fire
                                V1 o1 = leftPair.getValue().getValue();
                                V2 o2 = rightPair.getValue().getValue();

                                OUT out = this.joinAction.apply(o1, o2);

                                Properties header = this.context.getHeader();
                                header.put(Constant.WINDOW_START_TIME, leftWindowKey.getWindowStart());
                                header.put(Constant.WINDOW_END_TIME, leftWindowKey.getWindowEnd());

                                assert leftPair.getValue().getKey() == rightPair.getValue().getKey();
                                Data<K, OUT> result = new Data<>(leftPair.getValue().getKey(), out, this.context.getDataTime(), header);
                                Data<K, Object> convert = this.convert(result);

                                this.context.forward(convert);

                                fired.add(leftWindowKey);
                            }
                        }
                    }
                    break;
                case LEFT_JOIN:
                    switch (streamType) {
                        case LEFT_STREAM:
                            //左流全部触发,不管右流匹配上没
                            for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
                                WindowKey leftWindowKey = leftPair.getKey();

                                fired.add(leftWindowKey);

                                String leftPrefix = leftWindowKey.getKeyAndWindow();
                                Pair<WindowKey, WindowState<K, V2>> targetPair = null;

                                for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
                                    if (rightPair.getKey().getKeyAndWindow().equals(leftPrefix)) {
                                        targetPair = rightPair;
                                        break;
                                    }
                                }

                                //fire
                                V1 o1 = leftPair.getValue().getValue();
                                V2 o2 = null;
                                if (targetPair != null) {
                                    o2 = targetPair.getValue().getValue();
                                    fired.add(targetPair.getKey());

                                    assert leftPair.getValue().getKey() == targetPair.getValue().getKey();
                                }

                                OUT out = this.joinAction.apply(o1, o2);
                                Properties header = this.context.getHeader();
                                header.put(Constant.WINDOW_START_TIME, leftWindowKey.getWindowStart());
                                header.put(Constant.WINDOW_END_TIME, leftWindowKey.getWindowEnd());


                                Data<K, OUT> result = new Data<>(leftPair.getValue().getKey(), out, this.context.getDataTime(), header);
                                Data<K, Object> convert = this.convert(result);

                                this.context.forward(convert);
                            }
                            break;
                        case RIGHT_STREAM:
                            //do nothing.
                    }
                    break;
            }

            if (leftPairs.size() != 0) {
                logger.debug("delete left window.");
                for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
                    this.leftWindowStore.deleteByKey(leftPair.getKey());
                }
            }

            if (rightPairs.size() != 0) {
                logger.debug("delete right window.");
                for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
                    this.rightWindowStore.deleteByKey(rightPair.getKey());
                }
            }
        } catch (Throwable t) {
            String format = String.format("fire window error, watermark:%s.", watermark);
            throw new RStreamsException(format, t);
        }

        return fired;
    }