in streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/processor/absence/AbsenceProgram.java [48:109]
public DataStream<Event> getApplicationLogic(DataStream<Event>... messageStream) {
Time time = TimeUnitConverter.toTime(params.getTimeUnit(), params.getTimeWindowSize());
DataStream<Tuple2<Boolean, Event>> stream1 =
messageStream[0].flatMap(new FlatMapFunction<Event, Tuple2<Boolean, Event>>() {
@Override
public void flatMap(Event in, Collector<Tuple2<Boolean, Event>> out) throws
Exception {
out.collect(new Tuple2<>(true, in));
}
});
DataStream<Tuple2<Boolean, Event>> stream2 =
messageStream[1].flatMap(new FlatMapFunction<Event, Tuple2<Boolean, Event>>() {
@Override
public void flatMap(Event in, Collector<Tuple2<Boolean, Event>> out) throws
Exception {
out.collect(new Tuple2<>(false, in));
}
});
DataStream<Tuple2<Boolean, Event>> joinedStreams = stream2.union(stream1);
Pattern<Tuple2<Boolean, Event>, Tuple2<Boolean, Event>> matchedEvents =
Pattern.<Tuple2<Boolean, Event>>begin("start")
.where(new SimpleCondition<Tuple2<Boolean, Event>>() {
@Override
public boolean filter(Tuple2<Boolean, Event> ride) throws Exception {
return ride.f0;
}
})
.next("end")
.where(new SimpleCondition<Tuple2<Boolean, Event>>() {
@Override
public boolean filter(Tuple2<Boolean, Event> ride) throws Exception {
return !ride.f0;
}
});
PatternStream<Tuple2<Boolean, Event>> patternStream = CEP.pattern(joinedStreams, matchedEvents
.within(time));
OutputTag<Tuple2<Boolean, Event>> timedout = new OutputTag<Tuple2<Boolean, Event>>("timedout") {
};
SingleOutputStreamOperator<Tuple2<Boolean, Event>> matched = patternStream.flatSelect(
timedout,
new TimedOut(),
new FlatSelectNothing<>()
);
return matched.getSideOutput(timedout).flatMap(new FlatMapFunction<Tuple2<Boolean, Event>,
Event>() {
@Override
public void flatMap(Tuple2<Boolean, Event> in, Collector<Event> out) throws
Exception {
out.collect(in.f1);
}
});
}