in upload.py [0:0]
def try_upload(client: bigquery.Client,
data_date: date,
ping_ndjson: tuple[typing.IO[bytes], typing.Optional[int]],
config_ndjson: tuple[typing.IO[bytes], int]) -> bool:
try:
with BigquerySession(client) as session:
client.query_and_wait("BEGIN TRANSACTION;", job_config=session.config(bigquery.QueryJobConfig()))
# Clear out rows already associated with the date. We want all rows for
# a particular date to be the result of a single upload.
client.query_and_wait((
f"DELETE FROM {DATASET}.{CONFIG_TABLE_NAME} where date = @date;"
f"DELETE FROM {DATASET}.{TABLE_NAME} where DATE(submission_timestamp) = @date;"
),
job_config = session.config(bigquery.QueryJobConfig(
query_parameters = [
bigquery.ScalarQueryParameter("date", bigquery.SqlParameterScalarTypes.DATE, data_date),
],
)),
)
# Upload new data
client.load_table_from_file(
ping_ndjson[0],
client.dataset(DATASET).table(TABLE_NAME),
size = ping_ndjson[1],
rewind = True,
job_config = session.config(bigquery.LoadJobConfig(source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON))
).result()
client.load_table_from_file(
config_ndjson[0],
client.dataset(DATASET).table(CONFIG_TABLE_NAME),
size = config_ndjson[1],
rewind = True,
job_config = session.config(bigquery.LoadJobConfig(source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON))
).result()
client.query("COMMIT TRANSACTION;", job_config=session.config(bigquery.QueryJobConfig())).result()
return True
except BadRequest as e:
if "aborted due to concurrent update" in str(e):
return False
raise