in marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapSortedSessionsIntoSlidingLookbackWindows.java [60:139]
public void processElement(ProcessContext context) {
Instant snapshotStartDate = DateUtil.parseStartDateStringToInstant(
snapshotStartDateProvider.get());
Instant snapshotEndDate = DateUtil.parseEndDateStringToInstant(snapshotEndDateProvider.get());
Duration lookbackGapDuration =
Duration.standardSeconds(lookbackGapInSecondsProvider.get());
Duration windowDuration = Duration.standardSeconds(windowTimeInSecondsProvider.get());
Duration slideDuration = Duration.standardSeconds(slideTimeInSecondsProvider.get());
Duration minimumLookaheadDuration =
Duration.standardSeconds(minimumLookaheadTimeInSecondsProvider.get());
Duration maximumLookaheadDuration =
Duration.standardSeconds(maximumLookaheadTimeInSecondsProvider.get());
boolean stopOnFirstPositiveLabel = stopOnFirstPositiveLabelProvider.get();
Instant firstPositiveLabelInstant = null;
KV<String, List<Session>> kv = context.element();
String userId = kv.getKey();
ArrayList<Session> sessions = new ArrayList<>(kv.getValue());
if (sessions.isEmpty()) {
return;
}
ArrayList<Instant> positiveLabelTimes =
SortedSessionsUtil.getPositiveLabelTimes(sessions, snapshotStartDate, snapshotEndDate);
// Iterate over all possible LookbackWindows from startTime, moving forwards each time
// by slideDuration.
// Note: For simplicity, the code always advances by one slideDuration at a time, instead of
// jumping over large gaps in Session times. This has minimal impact on runtime compared to
// the overall IO bottlenecks.
int sessionStartIndex = 0;
for (Instant windowStartTime =
snapshotStartDate.minus(lookbackGapDuration).minus(windowDuration);
!windowStartTime.plus(windowDuration).plus(lookbackGapDuration).isAfter(snapshotEndDate);
windowStartTime = windowStartTime.plus(slideDuration)) {
Instant effectiveDate = windowStartTime.plus(windowDuration).plus(lookbackGapDuration);
if (stopOnFirstPositiveLabel
&& firstPositiveLabelInstant != null
&& firstPositiveLabelInstant.isBefore(effectiveDate.plus(minimumLookaheadDuration))) {
break;
}
// Find the first Session to start in the LookbackWindow.
while (sessionStartIndex < sessions.size()
&& windowStartTime.isAfter(sessions.get(sessionStartIndex).getVisitStartTime())) {
sessionStartIndex++;
}
// Return early if all Sessions occur before windowStartTime.
if (sessionStartIndex >= sessions.size()) {
return;
}
// Skip empty LookbackWindows until the first user activity is within the window duration.
if (sessionStartIndex == 0 && sessions.get(sessionStartIndex).getLastHitTime().isAfter(
windowStartTime.plus(windowDuration))) {
continue;
}
// Construct a LookbackWindow.
LookbackWindow window = new LookbackWindow();
window.setFirstActivityTime(sessions.get(0).getVisitStartTime());
window.setUserId(userId);
window.setStartTime(windowStartTime);
window.setEndTime(windowStartTime.plus(windowDuration));
window.setEffectiveDate(window.getEndTime().plus(lookbackGapDuration));
// Add Sessions to the LookbackWindow if they occur within the LookbackWindow time interval.
for (int i = sessionStartIndex; i < sessions.size(); i++) {
Session session = sessions.get(i);
if (!session.getVisitStartTime().isBefore(window.getStartTime())
&& !session.getLastHitTime().isAfter(window.getEndTime())) {
window.addSession(session);
}
}
Instant positiveLabelInstant = SortedSessionsUtil.getFirstInstantInInterval(
positiveLabelTimes,
effectiveDate.plus(minimumLookaheadDuration),
effectiveDate.plus(maximumLookaheadDuration));
if (firstPositiveLabelInstant == null) {
firstPositiveLabelInstant = positiveLabelInstant;
}
window.setPredictionLabel(positiveLabelInstant != null);
context.output(window);
}
}