upload.py (239 lines of code) (raw):
#!/usr/bin/env python3
# Upload processed crash ping data to bigquery.
from datetime import date
from google.cloud import bigquery
from google.api_core.exceptions import BadRequest
from pathlib import Path
import json
import os
import random
import sys
import time
import typing
PROJECT = 'moz-fx-data-shared-prod'
STAGE_PROJECT = 'moz-fx-data-shar-nonprod-efed'
DATASET = 'crash_ping_ingest_external'
# NOTE: This script will only deal with column changes. Changing internal
# information like column types won't be caught by `update_table_schema`.
TABLE_NAME = 'ingest_output'
TABLE_SCHEMA = [
bigquery.SchemaField('document_id', 'STRING',
description = "Identifier matching the document_id of a submitted ping.",
mode = 'REQUIRED',
),
bigquery.SchemaField('submission_timestamp', 'TIMESTAMP',
description = "The submission timestamp matching that of the submitted ping (used for partitioning).",
mode = 'REQUIRED',
),
bigquery.SchemaField('config_id', 'INTEGER',
description = "The id of the configuration partition that this ping was in.",
),
bigquery.SchemaField('crash_type', 'STRING',
description = "The crash type, extracted from the ping stack trace data for convenience."
),
bigquery.SchemaField('signature', 'STRING',
description = "The crash signature generated from the symbolicated stack frames."
),
bigquery.SchemaField('stack', 'RECORD',
description = "Symbolicated stack frames for the crash ping.",
mode = 'REPEATED',
fields = [
bigquery.SchemaField('function', 'STRING',
description = "The function symbol corresponding to the stack frame.",
),
bigquery.SchemaField('function_offset', 'STRING',
description = "The offset into the function, as a hex string.",
),
bigquery.SchemaField('file', 'STRING',
description = "The source file corresponding to the stack frame.",
),
bigquery.SchemaField('line', 'INTEGER',
description = "The source line corresponding to the stack frame.",
),
bigquery.SchemaField('module', 'STRING',
description = "The module corresponding to the stack frame.",
),
bigquery.SchemaField('module_offset', 'STRING',
description = "The offset into the module, as a hex string.",
),
bigquery.SchemaField('omitted', 'INTEGER',
description = "Whether frames were omitted. If this field is present, no other fields will be present.",
),
bigquery.SchemaField('error', 'STRING',
description = "An error message when generating the stack frame. If this field is present, no other fields will be present.",
),
],
),
]
CONFIG_TABLE_NAME = 'ingest_config'
CONFIG_TABLE_SCHEMA = [
bigquery.SchemaField('id', 'INTEGER',
description = "Unique integer (relative to 'date').",
mode = 'REQUIRED',
),
bigquery.SchemaField('date', 'DATE',
description = "The date for which the configuration applies.",
mode = 'REQUIRED',
),
bigquery.SchemaField('version', 'INTEGER',
description = "The configured firefox version.",
mode = 'REQUIRED',
),
bigquery.SchemaField('target_sample_count', 'INTEGER',
description = "The configured target sample count. Note that this may not match the actual sample count due to random sampling.",
mode = 'REQUIRED',
),
bigquery.SchemaField('os', 'STRING',
description = "The configured os.",
mode = 'REQUIRED',
),
bigquery.SchemaField('channel', 'STRING',
description = "The configured channel.",
mode = 'REQUIRED',
),
bigquery.SchemaField('process_type', 'STRING',
description = "The configured process type.",
mode = 'REQUIRED',
),
bigquery.SchemaField('utility_actor', 'STRING',
description = "The configured utility actor type. Only applicable when 'process_type' is 'utility'.",
),
bigquery.SchemaField('count', 'INTEGER',
description = "The count of pings matching the configuration.",
mode = 'REQUIRED',
),
]
T = typing.TypeVar('T', bound=typing.Any)
class SessionConfigurator:
def __init__(self, session_id: str):
self._session_id = session_id
def config(self, cfg: T) -> T:
cfg.create_session = False
props = cfg.connection_properties
props.append(bigquery.query.ConnectionProperty(key="session_id", value=self._session_id))
cfg.connection_properties = props
return cfg
# Adapted from https://dev.to/stack-labs/bigquery-transactions-over-multiple-queries-with-sessions-2ll5
class BigquerySession:
"""ContextManager wrapping a bigquery session."""
def __init__(self, bqclient: bigquery.Client):
"""Construct instance."""
self._bigquery_client = bqclient
self._session_id: typing.Optional[str] = None
def __enter__(self) -> SessionConfigurator:
"""Initiate a Bigquery session and return the session_id."""
job = self._bigquery_client.query(
"SELECT 1;", # a query that won't fail
job_config=bigquery.QueryJobConfig(create_session=True),
)
assert job.session_info.session_id is not None
self._session_cfg = SessionConfigurator(job.session_info.session_id)
job.result() # wait job completion
return self._session_cfg
def __exit__(self, exc_type, exc_value, traceback):
"""Abort the opened session."""
if self._session_cfg:
# abort the session in any case to have a clean state at the end
# (sometimes in case of script failure, the table is locked in
# the session)
job = self._bigquery_client.query(
"CALL BQ.ABORT_SESSION();",
job_config=self._session_cfg.config(bigquery.QueryJobConfig())
)
job.result()
def create_table(client: bigquery.Client):
table_ref = client.dataset(DATASET).table(TABLE_NAME)
table = bigquery.Table(table_ref, schema = TABLE_SCHEMA)
table._properties["tableConstraints"] = {}
table._properties["tableConstraints"]["primaryKey"] = {"columns": ["document_id"]}
# We can't add a foreign key because the ids may pertain to firefox_desktop or fenix tables.
table.time_partitioning = bigquery.TimePartitioning(
type_ = bigquery.TimePartitioningType.DAY,
field = "submission_timestamp",
# 775 days, matching the crash table
expiration_ms = 1000 * 60 * 60 * 24 * 775,
require_partition_filter = True,
)
client.create_table(table)
def schema_field_names(schema: list[bigquery.SchemaField]) -> set[str]:
return set(f.name for f in schema)
def update_table_schema(client: bigquery.Client, table_name: str, schema: list[bigquery.SchemaField]):
table_ref = client.dataset(DATASET).table(table_name)
table = client.get_table(table_ref)
if schema_field_names(table.schema) != schema_field_names(schema):
table.schema = schema
client.update_table(table, ["schema"])
def create_config_table(client: bigquery.Client):
table_ref = client.dataset(DATASET).table(CONFIG_TABLE_NAME)
table = bigquery.Table(table_ref, schema = CONFIG_TABLE_SCHEMA)
# We don't expect a lot of data to be present, however partitioning the
# table allows DML statements (like DELETE) to not cause concurrent update
# errors.
table.time_partitioning = bigquery.TimePartitioning(
type_ = bigquery.TimePartitioningType.DAY,
field = "date",
# 775 days, matching the other tables
expiration_ms = 1000 * 60 * 60 * 24 * 775,
)
client.create_table(table)
def project_id(prod: bool = False) -> str:
return PROJECT if prod else STAGE_PROJECT
def make_client(prod: bool = False) -> bigquery.Client:
return bigquery.Client(project = project_id(prod))
def try_upload(client: bigquery.Client,
data_date: date,
ping_ndjson: tuple[typing.IO[bytes], typing.Optional[int]],
config_ndjson: tuple[typing.IO[bytes], int]) -> bool:
try:
with BigquerySession(client) as session:
client.query_and_wait("BEGIN TRANSACTION;", job_config=session.config(bigquery.QueryJobConfig()))
# Clear out rows already associated with the date. We want all rows for
# a particular date to be the result of a single upload.
client.query_and_wait((
f"DELETE FROM {DATASET}.{CONFIG_TABLE_NAME} where date = @date;"
f"DELETE FROM {DATASET}.{TABLE_NAME} where DATE(submission_timestamp) = @date;"
),
job_config = session.config(bigquery.QueryJobConfig(
query_parameters = [
bigquery.ScalarQueryParameter("date", bigquery.SqlParameterScalarTypes.DATE, data_date),
],
)),
)
# Upload new data
client.load_table_from_file(
ping_ndjson[0],
client.dataset(DATASET).table(TABLE_NAME),
size = ping_ndjson[1],
rewind = True,
job_config = session.config(bigquery.LoadJobConfig(source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON))
).result()
client.load_table_from_file(
config_ndjson[0],
client.dataset(DATASET).table(CONFIG_TABLE_NAME),
size = config_ndjson[1],
rewind = True,
job_config = session.config(bigquery.LoadJobConfig(source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON))
).result()
client.query("COMMIT TRANSACTION;", job_config=session.config(bigquery.QueryJobConfig())).result()
return True
except BadRequest as e:
if "aborted due to concurrent update" in str(e):
return False
raise
def upload(prod: bool,
data_date: date,
ping_ndjson: tuple[typing.IO[bytes], typing.Optional[int]],
config_ndjson: tuple[typing.IO[bytes], int]):
client = make_client(prod)
tables = list(client.list_tables(DATASET))
if not any(x.table_id == TABLE_NAME for x in tables):
create_table(client)
if not any(x.table_id == CONFIG_TABLE_NAME for x in tables):
create_config_table(client)
update_table_schema(client, TABLE_NAME, TABLE_SCHEMA)
update_table_schema(client, CONFIG_TABLE_NAME, CONFIG_TABLE_SCHEMA)
# We may fail due to concurrent updates by other tasks (if a batch of tasks is started); just keep retrying for a while.
retries = 30
while not try_upload(client, data_date, ping_ndjson, config_ndjson):
time.sleep(random.randint(1, 5) * 30)
retries -= 1
if retries == 0:
raise RuntimeError("aborted due to concurrent update; retries exhausted")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--production', action='store_true', help="upload to production tables")
parser.add_argument('config', help="the config table jsonl file to read")
parser.add_argument('filename', help="the jsonl file to read; use '-' for stdin")
args = parser.parse_args()
size = None
data = None
if args.filename == '-':
data = sys.stdin.buffer
else:
path = Path(args.filename)
size = path.stat().st_size
data = path.open('rb')
config_path = Path(args.config)
config_path_size = config_path.stat().st_size
config = config_path.open('rb')
# Peek at the config to get the date
data_date = date.fromisoformat(json.loads(config.readline())["date"])
upload(args.production, data_date, (data, size), (config, config_path_size))