in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/JoinWindowFire.java [71:205]
public List<WindowKey> fire(String operatorName, long watermark, StreamType streamType) {
List<WindowKey> fired = new ArrayList<>();
try {
String leftWindow = Utils.buildKey(operatorName, StreamType.LEFT_STREAM.name());
List<Pair<WindowKey, WindowState<K, V1>>> leftPairs = this.leftWindowStore.searchLessThanWatermark(leftWindow, watermark);
if (leftPairs.size() != 0) {
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
logger.debug("search with key prefix:{} and watermark:{}, find window: {}", leftWindow, Utils.format(watermark), leftPair.getKey());
}
}
String rightWindow = Utils.buildKey(operatorName, StreamType.RIGHT_STREAM.name());
List<Pair<WindowKey, WindowState<K, V2>>> rightPairs = this.rightWindowStore.searchLessThanWatermark(rightWindow, watermark);
if (rightPairs.size() != 0) {
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
logger.debug("search with key prefix:{} and watermark:{}, find window: {}", rightWindow, Utils.format(watermark), rightPair.getKey());
}
}
if (leftPairs.size() == 0 && rightPairs.size() == 0) {
logger.debug("left window and right window are all empty, watermark:{}." +
"left window operatorName:{}, right window operatorName:{}", Utils.format(watermark), leftWindow, rightWindow);
return fired;
}
leftPairs.sort(Comparator.comparing(pair -> {
WindowKey key = pair.getKey();
return key.getWindowEnd();
}));
rightPairs.sort(Comparator.comparing(pair -> {
WindowKey key = pair.getKey();
return key.getWindowEnd();
}));
switch (joinType) {
case INNER_JOIN:
//匹配上才触发
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
WindowKey leftWindowKey = leftPair.getKey();
String leftPrefix = leftWindowKey.getKeyAndWindow();
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
String rightPrefix = rightPair.getKey().getKeyAndWindow();
//相同window中相同key,聚合
if (leftPrefix.equals(rightPrefix)) {
//do fire
V1 o1 = leftPair.getValue().getValue();
V2 o2 = rightPair.getValue().getValue();
OUT out = this.joinAction.apply(o1, o2);
Properties header = this.context.getHeader();
header.put(Constant.WINDOW_START_TIME, leftWindowKey.getWindowStart());
header.put(Constant.WINDOW_END_TIME, leftWindowKey.getWindowEnd());
assert leftPair.getValue().getKey() == rightPair.getValue().getKey();
Data<K, OUT> result = new Data<>(leftPair.getValue().getKey(), out, this.context.getDataTime(), header);
Data<K, Object> convert = this.convert(result);
this.context.forward(convert);
fired.add(leftWindowKey);
}
}
}
break;
case LEFT_JOIN:
switch (streamType) {
case LEFT_STREAM:
//左流全部触发,不管右流匹配上没
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
WindowKey leftWindowKey = leftPair.getKey();
fired.add(leftWindowKey);
String leftPrefix = leftWindowKey.getKeyAndWindow();
Pair<WindowKey, WindowState<K, V2>> targetPair = null;
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
if (rightPair.getKey().getKeyAndWindow().equals(leftPrefix)) {
targetPair = rightPair;
break;
}
}
//fire
V1 o1 = leftPair.getValue().getValue();
V2 o2 = null;
if (targetPair != null) {
o2 = targetPair.getValue().getValue();
fired.add(targetPair.getKey());
assert leftPair.getValue().getKey() == targetPair.getValue().getKey();
}
OUT out = this.joinAction.apply(o1, o2);
Properties header = this.context.getHeader();
header.put(Constant.WINDOW_START_TIME, leftWindowKey.getWindowStart());
header.put(Constant.WINDOW_END_TIME, leftWindowKey.getWindowEnd());
Data<K, OUT> result = new Data<>(leftPair.getValue().getKey(), out, this.context.getDataTime(), header);
Data<K, Object> convert = this.convert(result);
this.context.forward(convert);
}
break;
case RIGHT_STREAM:
//do nothing.
}
break;
}
if (leftPairs.size() != 0) {
logger.debug("delete left window.");
for (Pair<WindowKey, WindowState<K, V1>> leftPair : leftPairs) {
this.leftWindowStore.deleteByKey(leftPair.getKey());
}
}
if (rightPairs.size() != 0) {
logger.debug("delete right window.");
for (Pair<WindowKey, WindowState<K, V2>> rightPair : rightPairs) {
this.rightWindowStore.deleteByKey(rightPair.getKey());
}
}
} catch (Throwable t) {
String format = String.format("fire window error, watermark:%s.", watermark);
throw new RStreamsException(format, t);
}
return fired;
}