public void processElement()

in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsToUserActivities.java [58:128]


  public void processElement(ProcessContext context) {
    Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
        snapshotStartDateProvider.get());
    Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(
        snapshotEndDateProvider.get());
    Duration slideDuration = Duration.standardSeconds(slideTimeInSecondsProvider.get());
    Duration minimumLookaheadDuration =
        Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
    Duration maximumLookaheadDuration =
        Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
    Boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
    Instant firstPositiveLabelInstant = null;

    KV<String, List<Session>> kv = context.element();
    String userId = kv.getKey();
    ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
    if (sessions.isEmpty()) {
      return;
    }
    ArrayList<Instant> positiveLabelTimes = SortedSessionsUtil.getPositiveLabelTimes(
        sessions, snapshotStartDate, snapshotEndDate);

    int sessionIndex = 0;
    UserActivity userActivity = new UserActivity();
    userActivity.setUserId(userId);
    for (Instant snapshotTime = snapshotStartDate;
         !snapshotTime.isAfter(snapshotEndDate);
         snapshotTime = snapshotTime.plus(slideDuration)) {
      if (stopOnFirstPositiveLabel
          && firstPositiveLabelInstant != null
          && firstPositiveLabelInstant.isBefore(snapshotTime.plus(minimumLookaheadDuration))) {
        break;
      }
      userActivity.setSnapshotTime(snapshotTime);
      userActivity.setDurationSinceStartDate(new Duration(snapshotStartDate, snapshotTime));
      Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
          positiveLabelTimes,
          snapshotTime.plus(minimumLookaheadDuration),
          snapshotTime.plus(maximumLookaheadDuration));
      userActivity.setHasPositiveLabel(positiveLabelInstant != null);
      if (firstPositiveLabelInstant == null) {
        firstPositiveLabelInstant = positiveLabelInstant;
      }
      if (userActivity.getDurationSinceFirstActivity() != null) {
        userActivity.setDurationSinceFirstActivity(
            userActivity.getDurationSinceFirstActivity().plus(slideDuration));
      }
      if (userActivity.getDurationSinceLatestActivity() != null) {
        userActivity.setDurationSinceLatestActivity(
            userActivity.getDurationSinceLatestActivity().plus(slideDuration));
      }
      // Update the userActivity with Sessions that finish before the snapshotTime.
      for (; sessionIndex < sessions.size(); sessionIndex++) {
        Session session = sessions.get(sessionIndex);
        if (session.getLastHitTime().isAfter(snapshotTime)) {
          break;
        }
        Duration durationSinceSessionEnd = new Duration(session.getLastHitTime(), snapshotTime);
        userActivity.setDurationSinceLatestActivity(durationSinceSessionEnd);
        if (userActivity.getDurationSinceFirstActivity() == null) {
          userActivity.setDurationSinceFirstActivity(durationSinceSessionEnd);
        }
      }
      // Don't output a userActivity if there has been no activity, even if there is a positive
      // label in the prediction window.
      if (userActivity.getDurationSinceFirstActivity() == null) {
        continue;
      }
      context.output(userActivity);
    }
  }