marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapGATableRowToSession.java (125 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.corp.gtech.ads.datacatalyst.components.mldatawindowingpipeline.transform; import com.google.api.services.bigquery.model.TableRow; import com.google.corp.gtech.ads.datacatalyst.components.mldatawindowingpipeline.model.Fact; import com.google.corp.gtech.ads.datacatalyst.components.mldatawindowingpipeline.model.Session; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Instant; /** * Converts BigQuery TableRows from Google Analytics to Sessions. */ public class MapGATableRowToSession extends DoFn<TableRow, Session> { public static final String CUSTOM_DIMENSIONS = "customDimensions"; public static final String CUSTOM_DIMENSION_VALUE = "value"; public static final String CUSTOM_DIMENSION_INDEX = "index"; private final ValueProvider<String> factsToExtractProvider; private HashSet<String> factsToExtract; // Fact name of the label that we are trying to predict. private final ValueProvider<String> labelFactNameProvider; private String labelFactName; // Positive values of the label we are trying to predict. All other labels for the target fact // are considered negative. private final ValueProvider<String> positiveLabelFactValuesProvider; private HashSet<String> positiveLabelFactValues; public MapGATableRowToSession(ValueProvider<String> factsToExtractProvider, ValueProvider<String> labelFactNameProvider, ValueProvider<String> positiveLabelFactValuesProvider) { this.factsToExtractProvider = factsToExtractProvider; this.labelFactNameProvider = labelFactNameProvider; this.positiveLabelFactValuesProvider = positiveLabelFactValuesProvider; } // Returns given url with the trailing forward slash removed if present and otherwise non-empty. private static String normalizeURL(String url) { if (url.length() <= 1) { return url; } return url.replaceAll("/$", ""); } // Labels the given factName and factValue if positive, and also adds it to the given session. // If a factsToExtract filter is given and the factName is not in the filter, it is not added. private void addFactToSession(String factName, String factValue, Instant time, Session session) { if (!factsToExtractProvider.get().isEmpty() && !factsToExtract.contains(factName)) { return; } boolean label = factName.equals(labelFactName) && positiveLabelFactValues.contains(factValue); session.addFact( new Fact(session.getId(), session.getUserId(), time, factName, factValue, label)); } public void addFactsToSession( Session session, Instant time, String factNamePrefix, Object object) { if (object instanceof TableRow) { TableRow tablerow = (TableRow) object; Instant childTime = time; if (factNamePrefix.equals("hits")) { childTime = childTime.plus( Duration.millis(Long.parseLong(tablerow.get("time").toString()) / 1000)); DateTime dateTime = childTime.toDateTime(); addFactToSession( "dayOfWeekOfHit", String.valueOf(dateTime.getDayOfWeek()), childTime, session); addFactToSession("hourOfHit", String.valueOf(dateTime.getHourOfDay()), childTime, session); } if (factNamePrefix.contains(CUSTOM_DIMENSIONS)) { addFactsToSession( session, childTime, factNamePrefix + ".index." + tablerow.get(CUSTOM_DIMENSION_INDEX), tablerow.get(CUSTOM_DIMENSION_VALUE)); return; } for (Map.Entry<String, Object> entry : tablerow.entrySet()) { String childFactNamePrefix = factNamePrefix; if (!factNamePrefix.isEmpty()) { childFactNamePrefix += "."; } childFactNamePrefix += entry.getKey(); if (entry.getValue() instanceof List) { for (Object childObject : (List) entry.getValue()) { addFactsToSession(session, childTime, childFactNamePrefix, childObject); } } else { addFactsToSession(session, childTime, childFactNamePrefix, entry.getValue()); } } } else { String factValue = object.toString(); if (factNamePrefix.contains("Path")) { factValue = normalizeURL(factValue); } addFactToSession(factNamePrefix, factValue, time, session); } } void addDerivedTimeFactsToSession(Session session) { Instant startTime = session.getVisitStartTime(); DateTime dateTime = startTime.toDateTime(); int hourOfDay = dateTime.getHourOfDay(); String timeOfDay; if (hourOfDay >= 6 && hourOfDay <= 11) { timeOfDay = "morning"; } else if (hourOfDay >= 12 && hourOfDay <= 16) { timeOfDay = "afternoon"; } else if (hourOfDay >= 17 && hourOfDay <= 22) { timeOfDay = "evening"; } else { timeOfDay = "night"; } addFactToSession("visitStartTimeOfDay", timeOfDay, startTime, session); addFactToSession("visitStartDayOfWeek", dateTime.dayOfWeek().getAsText(), startTime, session); } // Converts BigQuery TableRows from Google Analytics to Sessions. @ProcessElement public void processElement(ProcessContext context) { factsToExtract = new HashSet<String>( Arrays.asList(factsToExtractProvider.get().split("\\s*,\\s*"))); factsToExtract.add("fullVisitorId"); factsToExtract.add("visitId"); factsToExtract.add("visitStartTime"); labelFactName = labelFactNameProvider.get(); positiveLabelFactValues = new HashSet<>(Arrays.asList(positiveLabelFactValuesProvider.get().split(","))); TableRow tablerow = context.element(); Session session = new Session(); session.setId(String.format("%s/%s", tablerow.get("fullVisitorId"), tablerow.get("visitId"))); session.setUserId(tablerow.get("fullVisitorId").toString()); session.setVisitStartTime( new Instant(1000 * Long.parseLong(tablerow.get("visitStartTime").toString()))); addDerivedTimeFactsToSession(session); addFactsToSession(session, session.getVisitStartTime(), "", tablerow); context.output(session); } }