in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsToUserActivities.java [58:128]
public void processElement(ProcessContext context) {
Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
snapshotStartDateProvider.get());
Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(
snapshotEndDateProvider.get());
Duration slideDuration = Duration.standardSeconds(slideTimeInSecondsProvider.get());
Duration minimumLookaheadDuration =
Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
Duration maximumLookaheadDuration =
Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
Boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
Instant firstPositiveLabelInstant = null;
KV<String, List<Session>> kv = context.element();
String userId = kv.getKey();
ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
if (sessions.isEmpty()) {
return;
}
ArrayList<Instant> positiveLabelTimes = SortedSessionsUtil.getPositiveLabelTimes(
sessions, snapshotStartDate, snapshotEndDate);
int sessionIndex = 0;
UserActivity userActivity = new UserActivity();
userActivity.setUserId(userId);
for (Instant snapshotTime = snapshotStartDate;
!snapshotTime.isAfter(snapshotEndDate);
snapshotTime = snapshotTime.plus(slideDuration)) {
if (stopOnFirstPositiveLabel
&& firstPositiveLabelInstant != null
&& firstPositiveLabelInstant.isBefore(snapshotTime.plus(minimumLookaheadDuration))) {
break;
}
userActivity.setSnapshotTime(snapshotTime);
userActivity.setDurationSinceStartDate(new Duration(snapshotStartDate, snapshotTime));
Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
positiveLabelTimes,
snapshotTime.plus(minimumLookaheadDuration),
snapshotTime.plus(maximumLookaheadDuration));
userActivity.setHasPositiveLabel(positiveLabelInstant != null);
if (firstPositiveLabelInstant == null) {
firstPositiveLabelInstant = positiveLabelInstant;
}
if (userActivity.getDurationSinceFirstActivity() != null) {
userActivity.setDurationSinceFirstActivity(
userActivity.getDurationSinceFirstActivity().plus(slideDuration));
}
if (userActivity.getDurationSinceLatestActivity() != null) {
userActivity.setDurationSinceLatestActivity(
userActivity.getDurationSinceLatestActivity().plus(slideDuration));
}
// Update the userActivity with Sessions that finish before the snapshotTime.
for (; sessionIndex < sessions.size(); sessionIndex++) {
Session session = sessions.get(sessionIndex);
if (session.getLastHitTime().isAfter(snapshotTime)) {
break;
}
Duration durationSinceSessionEnd = new Duration(session.getLastHitTime(), snapshotTime);
userActivity.setDurationSinceLatestActivity(durationSinceSessionEnd);
if (userActivity.getDurationSinceFirstActivity() == null) {
userActivity.setDurationSinceFirstActivity(durationSinceSessionEnd);
}
}
// Don't output a userActivity if there has been no activity, even if there is a positive
// label in the prediction window.
if (userActivity.getDurationSinceFirstActivity() == null) {
continue;
}
context.output(userActivity);
}
}