in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java [106:130]
public void process(Object data) throws Throwable {
Object key = this.context.getKey();
long time = this.context.getDataTime();
Properties header = this.context.getHeader();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
WindowInfo.JoinStream stream = (WindowInfo.JoinStream) header.get(Constant.STREAM_TAG);
StreamType streamType = stream.getStreamType();
if (streamType == null) {
String format = String.format("StreamType is empty, data:%s", data);
throw new IllegalStateException(format);
}
store(key, data, time, watermark, streamType);
List<WindowKey> fire = this.joinWindowFire.fire(this.name, watermark, streamType);
for (WindowKey windowKey : fire) {
this.idleWindowScaner.removeWindowKey(windowKey);
}
}