marketing-analytics/predicting/ml-data-windowing-pipeline/transform/MapUserActivityToTableRow.java (37 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.corp.gtech.ads.datacatalyst.components.mldatawindowingpipeline.transform; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.corp.gtech.ads.datacatalyst.components.mldatawindowingpipeline.model.UserActivity; import java.util.Arrays; import org.apache.beam.sdk.transforms.DoFn; /** * Converts a UserActivity to a BigQuery TableRow if the user has had some activity as of the * UserActivty's snapshotTime. */ public class MapUserActivityToTableRow extends DoFn<UserActivity, TableRow> { // Returns the table schema for UserActivity TableRows. public static TableSchema getTableSchema() { TableSchema schema = new TableSchema(); schema.setFields(Arrays.asList( new TableFieldSchema().setName("userId").setType("STRING"), new TableFieldSchema().setName("hasPositiveLabel").setType("BOOLEAN"), new TableFieldSchema().setName("daysSinceStartDate").setType("INTEGER"), new TableFieldSchema().setName("daysSinceFirstActivity").setType("INTEGER"), new TableFieldSchema().setName("daysSinceLatestActivity").setType("INTEGER"), new TableFieldSchema().setName("snapshotTimeInMillis").setType("INTEGER"))); return schema; } @ProcessElement public void processElement(ProcessContext context) { UserActivity userActivity = context.element(); if (userActivity.getDurationSinceFirstActivity() == null) { return; } TableRow tablerow = new TableRow(); tablerow.set("userId", userActivity.getUserId()); tablerow.set("hasPositiveLabel", userActivity.getHasPositiveLabel()); tablerow.set("daysSinceStartDate", userActivity.getDurationSinceStartDate().getStandardDays()); tablerow.set( "daysSinceFirstActivity", userActivity.getDurationSinceFirstActivity().getStandardDays()); tablerow.set( "daysSinceLatestActivity", userActivity.getDurationSinceLatestActivity().getStandardDays()); tablerow.set("snapshotTimeInMillis", userActivity.getSnapshotTime().getMillis()); context.output(tablerow); } }