public void processElement()

in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsIntoSessionLookbackWindows.java [50:126]


  public void processElement(ProcessContext context) {
    Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
        snapshotStartDateProvider.get());
    Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(snapshotEndDateProvider.get());
    Duration lookbackGapDuration =
        Duration.standardSeconds(lookbackGapInSecondsProvider.get());
    Duration windowDuration = Duration.standardSeconds(windowTimeInSecondsProvider.get());
    Duration minimumLookaheadDuration =
        Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
    Duration maximumLookaheadDuration =
        Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
    boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
    Instant firstPositiveLabelInstant = null;

    KV<String, List<Session>> kv = context.element();
    String userId = kv.getKey();
    ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
    if (sessions.isEmpty()) {
      return;
    }
    ArrayList<Instant> positiveLabelTimes =
        SortedSessionsUtil.getPositiveLabelTimes(sessions, snapshotStartDate, snapshotEndDate);

    /*
     * For each Session, outputs one LookbackWindow with the given Session as the last in the
     * window. Note that lastSessionIndex iterates over all the Sessions, while firstSessionIndex is
     * the index of the first Session in the current LookbackWindow defined by lastSessionIndex.
     * As lastSessionIndex increases, the time gap between lastSessionIndex and firstSessionIndex
     * may exceed the windowDuration (e.g. 30d). In this case, increment firstSessionIndex until it
     * falls within the current LookbackWindow.
     */
    int firstSessionIndex = 0;
    for (int lastSessionIndex = 0; lastSessionIndex < sessions.size(); lastSessionIndex++) {
      Session lastSession = sessions.get(lastSessionIndex);
      Instant effectiveDate = lastSession.getLastHitTime();
      if (stopOnFirstPositiveLabel
          && firstPositiveLabelInstant != null
          && firstPositiveLabelInstant.isBefore(effectiveDate.plus(minimumLookaheadDuration))) {
        break;
      }
      if (effectiveDate.isBefore(snapshotStartDate)) {
        continue;
      }
      if (effectiveDate.isAfter(snapshotEndDate)) {
        break;
      }
      Session firstSession = sessions.get(firstSessionIndex);
      while (effectiveDate
                 .minus(lookbackGapDuration).minus(windowDuration)
                 .isAfter(firstSession.getVisitStartTime()))  {
        firstSessionIndex++;
        firstSession = sessions.get(firstSessionIndex);
      }
      LookbackWindow window = new LookbackWindow();
      window.setUserId(userId);
      window.setFirstActivityTime(sessions.get(0).getVisitStartTime());
      window.setStartTime(effectiveDate.minus(lookbackGapDuration).minus(windowDuration));
      window.setEndTime(window.getStartTime().plus(windowDuration));
      window.setEffectiveDate(effectiveDate);
      for (int i = firstSessionIndex; i <= lastSessionIndex; i++) {
        Session session = sessions.get(i);
        if (!session.getVisitStartTime().isBefore(window.getStartTime())
            && !session.getLastHitTime().isAfter(window.getEndTime())) {
          window.addSession(session);
        }
      }
      Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
          positiveLabelTimes,
          effectiveDate.plus(minimumLookaheadDuration),
          effectiveDate.plus(maximumLookaheadDuration));
      if (firstPositiveLabelInstant == null) {
        firstPositiveLabelInstant = positiveLabelInstant;
      }
      window.setPredictionLabel(positiveLabelInstant != null);
      context.output(window);
    }
  }