private void fire()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java [105:153]


        private void fire(Object key, Object data, StreamType streamType) throws Throwable {
            switch (streamType) {
                case LEFT_STREAM: {
                    String name = Utils.buildKey(this.name, StreamType.RIGHT_STREAM.name());
                    String storeKey = Utils.buildKey(name, super.toHexString(key));
                    byte[] keyBytes = Utils.object2Byte(storeKey);

                    byte[] bytes = this.stateStore.get(keyBytes);

                    if (joinType == JoinType.INNER_JOIN) {
                        if (bytes == null || bytes.length == 0) {
                            break;
                        }
                    } else if (joinType == JoinType.LEFT_JOIN) {
                        //no-op
                    } else {
                        throw new UnsupportedOperationException("unknown joinType = " + joinType);
                    }

                    V1 v1Data = (V1) data;
                    V2 v2Data = super.byte2Object(bytes);

                    doFire(v1Data, v2Data);
                    break;
                }
                case RIGHT_STREAM: {
                    if (joinType != JoinType.INNER_JOIN) {
                        break;
                    }

                    String name = Utils.buildKey(this.name, StreamType.LEFT_STREAM.name());
                    String storeKey = Utils.buildKey(name, super.toHexString(key));
                    byte[] keyBytes = Utils.object2Byte(storeKey);

                    byte[] bytes = this.stateStore.get(keyBytes);
                    if (bytes == null || bytes.length == 0) {
                        break;
                    }

                    V2 v2Data = (V2) data;
                    V1 v1Data = super.byte2Object(bytes);

                    doFire(v1Data, v2Data);
                    break;
                }
            }

            //todo 是否需要删除状态?
        }