in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsIntoSessionLookbackWindows.java [50:126]
public void processElement(ProcessContext context) {
Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
snapshotStartDateProvider.get());
Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(snapshotEndDateProvider.get());
Duration lookbackGapDuration =
Duration.standardSeconds(lookbackGapInSecondsProvider.get());
Duration windowDuration = Duration.standardSeconds(windowTimeInSecondsProvider.get());
Duration minimumLookaheadDuration =
Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
Duration maximumLookaheadDuration =
Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
Instant firstPositiveLabelInstant = null;
KV<String, List<Session>> kv = context.element();
String userId = kv.getKey();
ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
if (sessions.isEmpty()) {
return;
}
ArrayList<Instant> positiveLabelTimes =
SortedSessionsUtil.getPositiveLabelTimes(sessions, snapshotStartDate, snapshotEndDate);
/*
* For each Session, outputs one LookbackWindow with the given Session as the last in the
* window. Note that lastSessionIndex iterates over all the Sessions, while firstSessionIndex is
* the index of the first Session in the current LookbackWindow defined by lastSessionIndex.
* As lastSessionIndex increases, the time gap between lastSessionIndex and firstSessionIndex
* may exceed the windowDuration (e.g. 30d). In this case, increment firstSessionIndex until it
* falls within the current LookbackWindow.
*/
int firstSessionIndex = 0;
for (int lastSessionIndex = 0; lastSessionIndex < sessions.size(); lastSessionIndex++) {
Session lastSession = sessions.get(lastSessionIndex);
Instant effectiveDate = lastSession.getLastHitTime();
if (stopOnFirstPositiveLabel
&& firstPositiveLabelInstant != null
&& firstPositiveLabelInstant.isBefore(effectiveDate.plus(minimumLookaheadDuration))) {
break;
}
if (effectiveDate.isBefore(snapshotStartDate)) {
continue;
}
if (effectiveDate.isAfter(snapshotEndDate)) {
break;
}
Session firstSession = sessions.get(firstSessionIndex);
while (effectiveDate
.minus(lookbackGapDuration).minus(windowDuration)
.isAfter(firstSession.getVisitStartTime())) {
firstSessionIndex++;
firstSession = sessions.get(firstSessionIndex);
}
LookbackWindow window = new LookbackWindow();
window.setUserId(userId);
window.setFirstActivityTime(sessions.get(0).getVisitStartTime());
window.setStartTime(effectiveDate.minus(lookbackGapDuration).minus(windowDuration));
window.setEndTime(window.getStartTime().plus(windowDuration));
window.setEffectiveDate(effectiveDate);
for (int i = firstSessionIndex; i <= lastSessionIndex; i++) {
Session session = sessions.get(i);
if (!session.getVisitStartTime().isBefore(window.getStartTime())
&& !session.getLastHitTime().isAfter(window.getEndTime())) {
window.addSession(session);
}
}
Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
positiveLabelTimes,
effectiveDate.plus(minimumLookaheadDuration),
effectiveDate.plus(maximumLookaheadDuration));
if (firstPositiveLabelInstant == null) {
firstPositiveLabelInstant = positiveLabelInstant;
}
window.setPredictionLabel(positiveLabelInstant != null);
context.output(window);
}
}