public DataStream getApplicationLogic()

in streampipes-extensions/streampipes-pipeline-elements-experimental-flink/src/main/java/org/apache/streampipes/pe/flink/processor/absence/AbsenceProgram.java [48:109]


  public DataStream<Event> getApplicationLogic(DataStream<Event>... messageStream) {

    Time time = TimeUnitConverter.toTime(params.getTimeUnit(), params.getTimeWindowSize());

    DataStream<Tuple2<Boolean, Event>> stream1 =
        messageStream[0].flatMap(new FlatMapFunction<Event, Tuple2<Boolean, Event>>() {
          @Override
          public void flatMap(Event in, Collector<Tuple2<Boolean, Event>> out) throws
              Exception {
            out.collect(new Tuple2<>(true, in));
          }
        });

    DataStream<Tuple2<Boolean, Event>> stream2 =
        messageStream[1].flatMap(new FlatMapFunction<Event, Tuple2<Boolean, Event>>() {
          @Override
          public void flatMap(Event in, Collector<Tuple2<Boolean, Event>> out) throws
              Exception {
            out.collect(new Tuple2<>(false, in));
          }
        });

    DataStream<Tuple2<Boolean, Event>> joinedStreams = stream2.union(stream1);


    Pattern<Tuple2<Boolean, Event>, Tuple2<Boolean, Event>> matchedEvents =
        Pattern.<Tuple2<Boolean, Event>>begin("start")
            .where(new SimpleCondition<Tuple2<Boolean, Event>>() {
              @Override
              public boolean filter(Tuple2<Boolean, Event> ride) throws Exception {
                return ride.f0;
              }
            })
            .next("end")
            .where(new SimpleCondition<Tuple2<Boolean, Event>>() {
              @Override
              public boolean filter(Tuple2<Boolean, Event> ride) throws Exception {
                return !ride.f0;
              }
            });

    PatternStream<Tuple2<Boolean, Event>> patternStream = CEP.pattern(joinedStreams, matchedEvents
        .within(time));

    OutputTag<Tuple2<Boolean, Event>> timedout = new OutputTag<Tuple2<Boolean, Event>>("timedout") {
    };

    SingleOutputStreamOperator<Tuple2<Boolean, Event>> matched = patternStream.flatSelect(
        timedout,
        new TimedOut(),
        new FlatSelectNothing<>()
    );

    return matched.getSideOutput(timedout).flatMap(new FlatMapFunction<Tuple2<Boolean, Event>,
        Event>() {
      @Override
      public void flatMap(Tuple2<Boolean, Event> in, Collector<Event> out) throws
          Exception {
        out.collect(in.f1);
      }
    });
  }