genai-for-marketing/infra/aux_data/data_gen.py (173 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import bigquery
from aux_data.metadata_aux_data import get_metadata_data
from typing import List, Dict
import numpy as np
from datetime import datetime, timedelta
SEED = 1
rng = np.random.default_rng(SEED)
def create_and_populate_customers(num_customers: int = 50000) -> List[Dict]:
from aux_data.customers_aux_data import channel, locations
customers_location = rng.choice(locations, size=(num_customers))
customers_channel = rng.choice(channel, size=(num_customers))
customers_total_purchases = rng.integers(1, 100, size=(num_customers))
customers_total_value = rng.integers(10, 1000, size=(num_customers))
customers_total_emails = rng.integers(1, 100, size=(num_customers))
customers_loyalty_score = rng.integers(1, 100, size=(num_customers))
customers_is_media_follower = rng.choice([False, True], size=(num_customers))
baseline_datetime = datetime(2023, 4, 1)
customers_last_sign_up_date = rng.integers(500, 1000, size=(num_customers))
customers_last_purchase_date = rng.integers(20, 100, size=(num_customers))
customers_last_activity_date = customers_last_purchase_date - rng.integers(10, 20, size=(num_customers))
customers_cart_total = rng.uniform(0.0, 800.0, size=(num_customers))
customers_data = []
for i in range(num_customers):
customer = {}
customer['customer_id'] = int(i)
customer['email'] = f'user{i}@sample_user{i}.sample'
customer['city'] = customers_location[i]['city']
customer['state'] = customers_location[i]['state']
customer['channel'] = customers_channel[i]['channel']
customer['total_purchases'] = int(customers_total_purchases[i])
customer['total_value'] = int(customers_total_value[i])
customer['total_emails'] = int(customers_total_emails[i])
customer['loyalty_score'] = int(customers_loyalty_score[i])
customer['is_media_follower'] = bool(customers_is_media_follower[i])
customer['last_sign_up_date'] = baseline_datetime - timedelta(days=int(customers_last_sign_up_date[i]))
customer['last_sign_up_date'] = customer['last_sign_up_date'].strftime('%Y-%m-%d')
customer['last_purchase_date'] = baseline_datetime - timedelta(days=int(customers_last_purchase_date[i]))
customer['last_purchase_date'] = customer['last_purchase_date'].strftime('%Y-%m-%d')
customer['last_activity_date'] = baseline_datetime - timedelta(days=int(customers_last_activity_date[i]))
customer['last_activity_date'] = customer['last_activity_date'].strftime('%Y-%m-%d')
customer['cart_total'] = round(float(customers_cart_total[i]), 2)
customers_data.append(customer)
return customers_data
# Generate and load events table to BQ
def create_and_populate_events(num_customers: int = 50000) -> Dict:
from aux_data.events_aux_data import event_type
events_per_customer = list(map(int, np.absolute(np.floor(rng.normal(1, 1, size=(num_customers)) * 100))))
num_events = sum(events_per_customer)
events_type = rng.choice(event_type, size=(num_events))
baseline_datetime = datetime(2023, 4, 1)
events_date_delta = rng.integers(20, 200, size=(num_events))
events_data = []
idx = 0
for i in range(num_customers):
for _ in range(events_per_customer[i]):
event = {}
event['customer_id'] = i
event['event_id'] = idx
event['event_date'] = baseline_datetime - timedelta(days=int(events_date_delta[idx]))
event['event_date'] = event['event_date'].strftime('%Y-%m-%d')
event['event_type'] = events_type[idx]['event_type']
idx += 1
events_data.append(event)
return events_data
# Generate and load transactions to BQ
def create_and_populate_transactions(num_customers: int = 50000) -> Dict:
from aux_data.transactions_aux_data import product_name, transaction_type
transactions_per_customer = list(map(int, np.absolute(np.floor(rng.normal(1, 1, size=(num_customers)) * 100))))
num_transactions = sum(transactions_per_customer)
product_name_choice = rng.choice(product_name, size=(num_transactions))
transaction_type_choice = rng.choice(transaction_type, size=(num_transactions))
transaction_qtn = rng.integers(1, 30, size=(num_transactions))
transaction_value = rng.integers(1, 5000, size=(num_transactions))
app_purchase_quantity = rng.integers(1, 10, size=(num_transactions))
transaction_is_online = rng.choice([False, True], size=(num_transactions))
baseline_datetime = datetime(2023, 4, 1)
transactions_date_delta = rng.integers(20, 200, size=(num_transactions))
transaction_data = []
transaction_id = 0
for i in range(num_customers):
for _ in range(transactions_per_customer[i]):
transaction = {}
transaction['transaction_id'] = transaction_id
transaction['customer_id'] = i
transaction['transaction_quantity'] = int(transaction_qtn[transaction_id])
transaction['transaction_value'] = int(transaction_value[transaction_id])
transaction['transaction_type'] = transaction_type_choice[transaction_id]['transaction_type']
transaction['app_purchase_quantity'] = int(app_purchase_quantity[transaction_id])
transaction['is_online'] = bool(transaction_is_online[transaction_id])
transaction['transaction_date'] = baseline_datetime - timedelta(days=int(transactions_date_delta[transaction_id]))
transaction['transaction_date'] = transaction['transaction_date'].strftime('%Y-%m-%d')
transaction['product_name'] = product_name_choice[transaction_id]['product_name']
transaction['product_id'] = product_name_choice[transaction_id]['product_id']
transaction_id += 1
transaction_data.append(transaction)
return transaction_data
def generate_and_populate_dataset(
PROJECT_ID: str,
DATASET_ID: str,
create_tables: bool = True
):
bq_client = bigquery.Client(project=PROJECT_ID)
# Define tables schema
customers_schema = [
bigquery.SchemaField('customer_id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('email', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('city', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('state', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('channel', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('total_purchases', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('total_value', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('total_emails', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('loyalty_score', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('is_media_follower', 'BOOLEAN', mode='NULLABLE'),
bigquery.SchemaField('last_sign_up_date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('last_purchase_date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('last_activity_date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('cart_total', 'FLOAT', mode='NULLABLE')
]
events_schema = [
bigquery.SchemaField('customer_id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('event_id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('event_date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('event_type', 'STRING', mode='NULLABLE')
]
transactions_schema = [
bigquery.SchemaField('transaction_id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('customer_id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('transaction_quantity', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('transaction_value', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('transaction_type', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('app_purchase_quantity', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('is_online', 'BOOLEAN', mode='NULLABLE'),
bigquery.SchemaField('transaction_date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('product_name', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('product_id', 'INTEGER', mode='NULLABLE')
]
metadata_schema = [
bigquery.SchemaField('dataset_id', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('table_id', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('column_id', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('description', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('is_primary_key', 'BOOLEAN', mode='NULLABLE'),
bigquery.SchemaField('is_foreign_key', 'BOOLEAN', mode='NULLABLE')
]
if(create_tables):
print('Creating tables ...')
for table_id, table_schema in zip(['customers', 'events', 'transactions', 'metadata'],
[customers_schema, events_schema, transactions_schema, metadata_schema]):
table_id = f'{PROJECT_ID}.{DATASET_ID}.{table_id}'
table = bigquery.Table(table_id, schema=table_schema)
table = bq_client.create_table(table)
print('Generating and populating METADATA table ...')
table_id = f"{PROJECT_ID}.{DATASET_ID}.metadata"
bq_client.load_table_from_json(
get_metadata_data(DATASET_ID=DATASET_ID),
destination=bigquery.Table(table_ref=table_id, schema=metadata_schema)
)
print('Generating and populating CUSTOMERS table ...')
customers_data = create_and_populate_customers()
table_id = f"{PROJECT_ID}.{DATASET_ID}.customers"
bq_client.load_table_from_json(
customers_data,
destination=bigquery.Table(table_ref=table_id, schema=customers_schema))
print('Generating and populating EVENTS table ...')
events_data = create_and_populate_events()
table_id = f"{PROJECT_ID}.{DATASET_ID}.events"
bq_client.load_table_from_json(
events_data,
destination=bigquery.Table(table_ref=table_id, schema=events_schema))
print('Generating and populating TRANSACTIONS table ...')
transactions_data = create_and_populate_transactions()
table_id = f"{PROJECT_ID}.{DATASET_ID}.transactions"
bq_client.load_table_from_json(
transactions_data,
destination=bigquery.Table(table_ref=table_id, schema=transactions_schema))