public void processElement()

in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsIntoSlidingLookbackWindows.java [60:139]


  public void processElement(ProcessContext context) {
    Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
        snapshotStartDateProvider.get());
    Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(snapshotEndDateProvider.get());
    Duration lookbackGapDuration =
        Duration.standardSeconds(lookbackGapInSecondsProvider.get());
    Duration windowDuration = Duration.standardSeconds(windowTimeInSecondsProvider.get());
    Duration slideDuration = Duration.standardSeconds(slideTimeInSecondsProvider.get());
    Duration minimumLookaheadDuration =
        Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
    Duration maximumLookaheadDuration =
        Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
    boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
    Instant firstPositiveLabelInstant = null;

    KV<String, List<Session>> kv = context.element();
    String userId = kv.getKey();
    ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
    if (sessions.isEmpty()) {
      return;
    }
    ArrayList<Instant> positiveLabelTimes =
        SortedSessionsUtil.getPositiveLabelTimes(sessions, snapshotStartDate, snapshotEndDate);

    // Iterate over all possible LookbackWindows from startTime, moving forwards each time
    // by slideDuration.
    // Note: For simplicity, the code always advances by one slideDuration at a time, instead of
    // jumping over large gaps in Session times. This has minimal impact on runtime compared to
    // the overall IO bottlenecks.
    int sessionStartIndex = 0;
    for (Instant windowStartTime =
             snapshotStartDate.minus(lookbackGapDuration).minus(windowDuration);
         !windowStartTime.plus(windowDuration).plus(lookbackGapDuration).isAfter(snapshotEndDate);
         windowStartTime = windowStartTime.plus(slideDuration)) {
      Instant effectiveDate = windowStartTime.plus(windowDuration).plus(lookbackGapDuration);
      if (stopOnFirstPositiveLabel
          && firstPositiveLabelInstant != null
          && firstPositiveLabelInstant.isBefore(effectiveDate.plus(minimumLookaheadDuration))) {
        break;
      }
      // Find the first Session to start in the LookbackWindow.
      while (sessionStartIndex < sessions.size()
             && windowStartTime.isAfter(sessions.get(sessionStartIndex).getVisitStartTime())) {
        sessionStartIndex++;
      }
      // Return early if all Sessions occur before windowStartTime.
      if (sessionStartIndex >= sessions.size()) {
        return;
      }
      // Skip empty LookbackWindows until the first user activity is within the window duration.
      if (sessionStartIndex == 0 && sessions.get(sessionStartIndex).getLastHitTime().isAfter(
              windowStartTime.plus(windowDuration))) {
        continue;
      }
      // Construct a LookbackWindow.
      LookbackWindow window = new LookbackWindow();
      window.setFirstActivityTime(sessions.get(0).getVisitStartTime());
      window.setUserId(userId);
      window.setStartTime(windowStartTime);
      window.setEndTime(windowStartTime.plus(windowDuration));
      window.setEffectiveDate(window.getEndTime().plus(lookbackGapDuration));
      // Add Sessions to the LookbackWindow if they occur within the LookbackWindow time interval.
      for (int i = sessionStartIndex; i < sessions.size(); i++) {
        Session session = sessions.get(i);
        if (!session.getVisitStartTime().isBefore(window.getStartTime())
            && !session.getLastHitTime().isAfter(window.getEndTime())) {
          window.addSession(session);
        }
      }
      Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
          positiveLabelTimes,
          effectiveDate.plus(minimumLookaheadDuration),
          effectiveDate.plus(maximumLookaheadDuration));
      if (firstPositiveLabelInstant == null) {
        firstPositiveLabelInstant = positiveLabelInstant;
      }
      window.setPredictionLabel(positiveLabelInstant != null);
      context.output(window);
    }
  }