jobs/client-regeneration/client_regeneration/main.py (216 lines of code) (raw):
from typing import List
import click
from google.cloud import bigquery
from datetime import datetime, timedelta, date
DEFAULT_LOOKBACK = 7
DEFAULT_START_DATE = "2022-04-01"
DEFAULT_END_DATE = "2022-06-01"
COLUMN_LIST = [
"country",
"device_model",
"device_manufacturer",
]
def init_replacement_table(client, seed):
# init the table that will contain the mappings of client_ids to be replaced to their selected replacements
q = f"""CREATE OR REPLACE TABLE mozdata.analysis.regen_sim_client_replacements_{str(seed)} (
client_id STRING, -- client_id to be replaced - the "fake new" client that first showed up on `regen_date`.
label STRING, -- what we used to do the match.
regen_date DATE, -- the date the "fake new" client id shows up - will be replaced on this day forward.
regened_last_date DATE, -- the last date we observed the fake new client/the last day it needs to be replaced.
replacement_id STRING, -- the client_id we sampled from the churn pool to replace `client_id` above.
last_reported_date DATE, -- this is the day the replacement_id churned in the original history.
first_seen_date DATE -- this is the first_seen_date of the replacement_id churned in its original history.
)
PARTITION BY regen_date
"""
job = client.query(q)
job.result()
def init_regen_pool(client, seed, start_date):
# init a new version of the regen pool, write table to BQ and name it with the random seed that will be used in
# the sampling
# TODO: Can we get rid of this DECLARE?
q = f"""DECLARE start_date DATE DEFAULT DATE("{start_date}"); CREATE OR REPLACE TABLE
mozdata.analysis.regen_sim_regen_pool_{seed} PARTITION BY regen_date AS SELECT * FROM
mozdata.analysis.regen_sim_regen_pool_v2 WHERE regen_date >= start_date;"""
job = client.query(q)
job.result()
def init_churn_pool(client, seed, start_date, lookback):
# init a new version of the churn pool, write table to BQ and name it with the random seed that will be used in
# the sampling
q = f"""CREATE OR REPLACE TABLE mozdata.analysis.regen_sim_churn_pool_{seed} PARTITION BY last_reported_date AS
SELECT * FROM mozdata.analysis.regen_sim_churn_pool_v2 WHERE last_reported_date >= DATE_SUB(DATE("{start_date}"),
INTERVAL {lookback + 1} DAY);"""
job = client.query(q)
job.result()
def sample_for_replacement_bq(client, date, column_list, seed, lookback):
q = f"""
-- this is much faster than the pandas way now
INSERT INTO mozdata.analysis.regen_sim_client_replacements_{str(seed)}
WITH
churned AS (
SELECT
*,
CONCAT({", '_', ".join(column_list)}) AS bucket,
FROM
mozdata.analysis.regen_sim_churn_pool_{str(seed)}
WHERE
last_reported_date BETWEEN DATE_SUB(DATE("{date}"), INTERVAL ({lookback}) DAY) AND DATE("{date}")
),
churned_numbered AS (
SELECT
*,
-- number the clients randomly within bucket. adding the seed makes the sort order reproducable.
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY FARM_FINGERPRINT(CONCAT(client_id, {str(seed)}))) AS rn,
FROM churned
-- do not sample from from churned clients with FSD equal to the current date that is undergoing replacement (i.e. clients that churn the same day they are new)
-- these clients will be eligible for replacement, but they won't serve as replacements on the same day.
WHERE first_seen_date != DATE("{date}")
),
regen AS (
SELECT
*,
CONCAT({", '_', ".join(column_list)}) AS bucket,
FROM mozdata.analysis.regen_sim_regen_pool_{str(seed)} WHERE regen_date = DATE("{date}")
),
regen_numbered AS (
SELECT
*,
-- this will always sort the clients that will be replaced in the same order. we randomize the order of the
-- churned clients (see above)
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY client_id ) AS rn,
FROM regen
)
SELECT
r.client_id,
r.bucket AS label,
r.regen_date,
r.regened_last_date,
c.client_id AS replacement_id,
c.last_reported_date,
c.first_seen_date
FROM regen_numbered r
LEFT JOIN churned_numbered c
USING(bucket, rn);
"""
job = client.query(q)
job.result()
def update_churn_pool(client, seed, date):
q = f"""
-- get the replacements for the given day
-- WITH replacements AS (
-- SELECT *
-- FROM
-- `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
-- WHERE
-- regen_date = "{str(date)}"
-- )
-- remove the clients used as replacements from the churn pool (we only want them to serve as one client's
-- replacement)
DELETE FROM `mozdata.analysis.regen_sim_churn_pool_{str(seed)}`
WHERE client_id IN (
SELECT replacement_id
FROM ( SELECT replacement_id
FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
WHERE regen_date = "{str(date)}"
AND replacement_id IS NOT NULL )
);
-- find cases where the regenerated IDs are also in the churn pool - replace them with their sampled replacement
-- client.
UPDATE `mozdata.analysis.regen_sim_churn_pool_{str(seed)}` c
SET client_id = r.replacement_id,
first_seen_date = r.first_seen_date
FROM
(SELECT client_id, replacement_id, first_seen_date
FROM `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
WHERE regen_date = "{str(date)}"
AND replacement_id IS NOT NULL ) r
WHERE c.client_id = r.client_id;
"""
# Here's a potential replacement of the above with DML -> DDL:
# WITH replacements AS (
# SELECT
# replacement_id,
# client_id
# FROM
# `mozdata.analysis.regen_sim_client_replacements_{str(seed)}`
# WHERE
# regen_date = "{str(date)}"
# ),
# churn_pool_replacements_removed AS (
# SELECT
# churn.*
# FROM
# `mozdata.analysis.regen_sim_churn_pool_{str(seed)}` churn
# LEFT JOIN replacements r ON (client_id = replacement_id)
# WHERE
# replacement_id IS NULL
# )
# SELECT
# churn.* EXCEPT (client_id),
# COALESCE(replacement_id, client_id) as client_id
# FROM
# churn_pool_replacements_removed churn
# LEFT JOIN replacements USING(client_id)
job = client.query(q)
job.result()
# def write_attributed_clients_history(client, seed, start_date):
# table_name = """mozdata.analysis.regen_sim_replaced_attributable_clients_v2_{}""".format(str(seed))
# q = f"""
# CREATE OR REPLACE TABLE {table_name}
# AS
# WITH base AS (
# SELECT
# COALESCE(r.replacement_id, c.client_id) AS client_id,
# COALESCE(r.first_seen_date, c.cohort_date) AS first_seen_date,
# c.client_id AS original_client_id,
# r.label,
# c.submission_date,
# ad_clicks,
# searches,
# searches_with_ads,
# first_reported_country,
# sample_id,
# regened_last_date
# FROM `mozdata.fenix.attributable_clients_v2` c
# LEFT JOIN `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
# -- we want the history starting on the regen date.
# ON (c.client_id = r.client_id) AND (c.submission_date BETWEEN r.regen_date AND r.regened_last_date)
# AND c.submission_date >= DATE("{str(start_date)}")
# ),
#
# numbered AS (
# SELECT
# *,
# ROW_NUMBER() OVER (PARTITION BY client_id, submission_date ORDER BY regened_last_date DESC) AS rn
# FROM base
# -- this is to handle the case where the same ID ends a replacement and starts another replacement on the same day, leading to more than one row / client.
# -- in that case, we ignore the last day of the earlier replacement.
# )
#
# SELECT
# * EXCEPT(regened_last_date, rn)
# FROM numbered
# WHERE rn = 1
# """
#
# job = client.query(q)
# job.result()
# return(table_name)
def write_baseline_clients_daily(client, seed, start_date, end_date):
table_name = (
"""mozdata.analysis.regen_sim_replaced_baseline_clients_daily_{}""".format(
str(seed)
)
)
q = f"""
CREATE OR REPLACE TABLE {table_name}
AS
WITH base AS (
SELECT
COALESCE(r.replacement_id, c.client_id) AS client_id,
COALESCE(r.first_seen_date, c.first_seen_date) AS first_seen_date,
c.client_id AS original_client_id,
c.first_seen_date AS original_first_seen_date,
c.submission_date,
c.country,
c.device_model,
udf.safe_sample_id(COALESCE(r.replacement_id, c.client_id)) AS sample_id,
regened_last_date,
FROM `mozdata.fenix.baseline_clients_daily` c
LEFT JOIN `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
-- we want the history starting on the regen date.
ON (c.client_id = r.client_id) AND (c.submission_date BETWEEN r.regen_date AND r.regened_last_date)
WHERE c.submission_date BETWEEN DATE("{str(start_date)}") AND DATE("{str(end_date)}")
),
numbered AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY client_id, submission_date ORDER BY regened_last_date DESC) AS rn
FROM base
-- this is to handle the case where the same ID ends a replacement and starts another replacement on the same day, leading to more than one row / client.
-- in that case, we ignore the entry from the earlier replacement.
)
SELECT
* EXCEPT(regened_last_date, rn)
FROM numbered
WHERE rn = 1
"""
job = client.query(q)
job.result()
return table_name
def write_baseline_clients_daily_with_searches(client, seed, start_date, end_date):
table_name = """mozdata.analysis.regen_sim_replaced_baseline_clients_daily_with_searches_{}""".format(
str(seed)
)
q = f"""
CREATE OR REPLACE TABLE {table_name}
AS
WITH
base AS (
SELECT
c.client_id,
c.first_seen_date AS first_seen_date,
c.submission_date,
c.country,
c.device_model,
COALESCE(a.searches, 0) AS searches,
COALESCE(a.searches_with_ads, 0) AS searches_with_ads,
COALESCE(a.ad_clicks, 0) AS ad_clicks,
FROM `mozdata.fenix.baseline_clients_daily` c
LEFT JOIN `mozdata.fenix.attributable_clients_v2` a
USING(client_id, submission_date)
WHERE c.submission_date BETWEEN DATE("{str(start_date)}") AND DATE("{str(end_date)}")
),
replaced AS (
SELECT
COALESCE(r.replacement_id, c.client_id) AS client_id,
COALESCE(r.first_seen_date, c.first_seen_date) AS first_seen_date,
udf.safe_sample_id(COALESCE(r.replacement_id, c.client_id)) AS sample_id,
c.country,
c.device_model,
c.submission_date,
ARRAY_AGG(c.client_id IGNORE NULLS ORDER BY regened_last_date) AS original_client_id,
ARRAY_AGG(c.first_seen_date IGNORE NULLS ORDER BY regened_last_date) AS original_first_seen_date,
SUM(searches) AS searches,
SUM(searches_with_ads) AS searches_with_ads,
SUM(ad_clicks) AS ad_clicks,
FROM base c
LEFT JOIN `mozdata.analysis.regen_sim_client_replacements_{str(seed)}` r
-- we want the history starting on the regen date.
ON (c.client_id = r.client_id) AND (c.submission_date BETWEEN r.regen_date AND r.regened_last_date)
GROUP BY 1,2,3,4,5,6
)
SELECT
*
FROM replaced
"""
job = client.query(q)
job.result()
return table_name
def init_baseline_clients_yearly(client, seed):
clients_yearly_name = f"mozdata.analysis.regen_sim_replaced_clients_yearly_{seed}"
clients_daily_name = f"mozdata.analysis.regen_sim_replaced_baseline_clients_daily_with_searches_{seed}"
# client_id STRING,
# first_seen_date DATE,
# original_client_id STRING,
# original_first_seen_date DATE,
# submission_date DATE,
# country STRING,
# device_model STRING,
# sample_id INTEGER,
# days_seen_bytes BYTES,
# searches INTEGER,
# searches_with_ads INTEGER,
# ad_clicks INTEGER,
create_yearly_stmt = f"""CREATE OR REPLACE TABLE {clients_yearly_name}
PARTITION BY
submission_date
CLUSTER BY
sample_id,
client_id
AS
SELECT
*,
CAST(NULL AS BYTES) AS days_seen_bytes,
FROM
{clients_daily_name}
WHERE
-- Output empty table and read no input rows
FALSE
"""
client.query(create_yearly_stmt).result()
print(f"{clients_yearly_name} created")
def _write_baseline_clients_yearly_partition(
client, clients_daily, clients_yearly, submission_date
):
query_stmt = f"""
WITH _current AS (
SELECT
* EXCEPT (submission_date),
udf.bool_to_365_bits(TRUE) AS days_seen_bytes,
FROM
`{clients_daily}`
WHERE
submission_date = '{submission_date}'
),
_previous AS (
SELECT
* EXCEPT (submission_date),
FROM
`{clients_yearly}`
WHERE
submission_date = DATE_SUB('{submission_date}', INTERVAL 1 DAY)
AND BIT_COUNT(udf.shift_365_bits_one_day(days_seen_bytes)) > 0
)
SELECT
DATE('{submission_date}') AS submission_date,
IF(_current.client_id IS NOT NULL, _current, _previous).* REPLACE (
udf.combine_adjacent_days_365_bits(
_previous.days_seen_bytes,
_current.days_seen_bytes
) AS days_seen_bytes
)
FROM
_current
FULL OUTER JOIN
_previous
USING
(sample_id, client_id)
"""
partition = submission_date.strftime("%Y%m%d")
destination_table = f"{clients_yearly}${partition}"
print(f"Backfilling `{destination_table}` ...")
query_config = bigquery.QueryJobConfig(
destination=destination_table,
default_dataset=f"mozdata.analysis",
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
client.query(query_stmt, job_config=query_config).result()
def write_baseline_clients_yearly(client, seed, start_date, end_date):
clients_daily_name = f"mozdata.analysis.regen_sim_replaced_baseline_clients_daily_with_searches_{seed}"
clients_yearly_name = f"mozdata.analysis.regen_sim_replaced_clients_yearly_{seed}"
dates = [
date.fromisoformat(start_date) + timedelta(i)
for i in range(
(date.fromisoformat(end_date) - date.fromisoformat(start_date)).days + 1
)
]
for submission_date in dates:
_write_baseline_clients_yearly_partition(
client, clients_daily_name, clients_yearly_name, submission_date
)
def write_usage_history(client, seed, start_date, end_date):
table_name = """mozdata.analysis.regen_sim_replaced_clients_last_seen_{}""".format(
str(seed)
)
q = f"""
DECLARE start_date DATE DEFAULT "{str(start_date)}";
DECLARE end_date DATE DEFAULT "{str(end_date)}";
CREATE TEMP FUNCTION process_bits(bits BYTES) AS (
STRUCT(
bits,
-- An INT64 version of the bits, compatible with bits28 functions
CAST(CONCAT('0x', TO_HEX(RIGHT(bits, 4))) AS INT64) << 36 >> 36 AS bits28,
-- An INT64 version of the bits with 64 days of history
CAST(CONCAT('0x', TO_HEX(RIGHT(bits, 4))) AS INT64) AS bits64,
-- A field like days_since_seen from clients_last_seen.
udf.bits_to_days_since_seen(bits) AS days_since_active,
-- Days since first active, analogous to first_seen_date in clients_first_seen
udf.bits_to_days_since_first_seen(bits) AS days_since_first_active
)
);
CREATE OR REPLACE TABLE {table_name}
PARTITION BY submission_date
AS
WITH
alltime AS (
SELECT
client_id,
first_seen_date,
-- BEGIN
-- Here we produce bit pattern fields based on the daily aggregates from the
-- previous step;
udf.bits_from_offsets(
ARRAY_AGG(
DATE_DIFF(end_date, submission_date, DAY)
)
) AS days_active_bits,
-- END
FROM
mozdata.analysis.regen_sim_replaced_baseline_clients_daily_{str(seed)}
WHERE submission_date BETWEEN start_date AND end_date
GROUP BY
client_id, first_seen_date
)
SELECT
end_date - i AS submission_date,
first_seen_date,
client_id,
process_bits(days_active_bits >> i) AS days_active
FROM
alltime
-- The cross join parses each input row into one row per day since the client
-- was first seen, emulating the format of the existing clients_last_seen table.
CROSS JOIN
UNNEST(GENERATE_ARRAY(0, DATE_DIFF(end_date, start_date, DAY))) AS i
WHERE
(days_active_bits >> i) IS NOT NULL
"""
job = client.query(q)
job.result()
return table_name
def create_replacements(
client: bigquery.Client,
seed: int,
start_date: str,
end_date: str,
column_list: list,
lookback: int = 7,
use_existing: bool = False,
):
# create a table mapping regenerated clients to their matching replacements.
# TODO: Can we get rid of these?
print(f"Creating replacements for seed {seed} from {start_date} to {end_date}")
replacement_table_name = (
"""mozdata.analysis.regen_sim_client_replacements_{}""".format(str(seed))
)
churn_table_name = """mozdata.analysis.regen_sim_churn_pool_{}""".format(str(seed))
if not use_existing:
init_replacement_table(client, seed)
init_regen_pool(client, seed, start_date)
init_churn_pool(client, seed, start_date, lookback)
start_dt = datetime.strptime(start_date, "%Y-%m-%d").date()
end_dt = datetime.strptime(end_date, "%Y-%m-%d").date()
one_day = timedelta(days=1)
current_dt = start_dt
while current_dt <= end_dt:
# get the replacements
print("""replacing on date {}""".format(str(current_dt)))
replacements = (
sample_for_replacement_bq( # TODO: Is this supposed to return something?
client, str(current_dt), column_list, seed, lookback
)
)
print("updating churn pool")
update_churn_pool(client, seed, current_dt)
current_dt += one_day
def run_simulation(
client: bigquery.Client,
seed: int,
start_date: str,
column_list: list,
end_date: str,
lookback: int,
run_replacement: bool,
run_usage_history: bool,
run_clients_daily: bool,
run_clients_daily_with_search: bool,
run_clients_yearly: bool,
run_attributed_clients: bool,
):
# at a high level there are two main steps here 1. go day by day and match regenerated client_ids to replacement
# client_ids that "look like" they churned in the prior `lookback` days. write the matches to a table 2. using
# the matches from 2, write alternative client histories where regenerated clients are given their replacement ids.
if run_replacement:
create_replacements(
client,
seed=seed,
start_date=start_date,
end_date=end_date,
column_list=column_list,
lookback=lookback,
)
if run_usage_history:
write_usage_history(client, seed=seed, start_date=start_date, end_date=end_date)
if run_clients_daily:
write_baseline_clients_daily(
client, seed=seed, start_date=start_date, end_date=end_date
)
if run_clients_daily_with_search:
write_baseline_clients_daily_with_searches(
client, seed=seed, start_date=start_date, end_date=end_date
)
if run_clients_yearly:
init_baseline_clients_yearly(client, seed=seed)
write_baseline_clients_yearly(
client, seed=seed, start_date=start_date, end_date=end_date
)
# if run_attributed_clients:
# write_attributed_clients_history(client, seed=seed, start_date=start_date)
@click.command()
@click.option("--seed", required=True, type=int, help="Random seed for sampling.")
@click.option(
"--start_date",
type=click.DateTime(),
help="Date to start looking for replacements and writing history.",
default=DEFAULT_START_DATE,
)
@click.option(
"--end_date",
type=click.DateTime(),
help="Date to stop looking for replacements and writing history.",
default=DEFAULT_END_DATE,
)
@click.option(
"--lookback",
type=int,
help="How many days to look back for churned clients.",
default=DEFAULT_LOOKBACK,
)
@click.option("--run-replacement", type=bool, default=False)
@click.option("--run-usage-history", type=bool, default=False)
@click.option("--run-clients-daily", type=bool, default=False)
@click.option("--run-clients-daily-with-search", type=bool, default=False)
@click.option("--run-clients-yearly", type=bool, default=False)
@click.option("--run-attributed-clients", type=bool, default=False)
# TODO: column list as a parameter?
def main(
seed,
start_date,
end_date,
lookback,
run_replacement,
run_usage_history,
run_clients_daily,
run_clients_daily_with_search,
run_clients_yearly,
run_attributed_clients,
):
start_date, end_date = str(start_date.date()), str(end_date.date())
client = bigquery.Client(project="mozdata")
run_simulation(
client,
seed=seed,
start_date=start_date,
column_list=COLUMN_LIST,
end_date=end_date,
lookback=lookback,
run_replacement=run_replacement,
run_usage_history=run_usage_history,
run_clients_daily=run_clients_daily,
run_clients_daily_with_search=run_clients_daily_with_search,
run_clients_yearly=run_clients_yearly,
run_attributed_clients=run_attributed_clients,
)
if __name__ == "__main__":
main()