in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java [105:153]
private void fire(Object key, Object data, StreamType streamType) throws Throwable {
switch (streamType) {
case LEFT_STREAM: {
String name = Utils.buildKey(this.name, StreamType.RIGHT_STREAM.name());
String storeKey = Utils.buildKey(name, super.toHexString(key));
byte[] keyBytes = Utils.object2Byte(storeKey);
byte[] bytes = this.stateStore.get(keyBytes);
if (joinType == JoinType.INNER_JOIN) {
if (bytes == null || bytes.length == 0) {
break;
}
} else if (joinType == JoinType.LEFT_JOIN) {
//no-op
} else {
throw new UnsupportedOperationException("unknown joinType = " + joinType);
}
V1 v1Data = (V1) data;
V2 v2Data = super.byte2Object(bytes);
doFire(v1Data, v2Data);
break;
}
case RIGHT_STREAM: {
if (joinType != JoinType.INNER_JOIN) {
break;
}
String name = Utils.buildKey(this.name, StreamType.LEFT_STREAM.name());
String storeKey = Utils.buildKey(name, super.toHexString(key));
byte[] keyBytes = Utils.object2Byte(storeKey);
byte[] bytes = this.stateStore.get(keyBytes);
if (bytes == null || bytes.length == 0) {
break;
}
V2 v2Data = (V2) data;
V1 v1Data = super.byte2Object(bytes);
doFire(v1Data, v2Data);
break;
}
}
//todo 是否需要删除状态?
}