in 5-app-infra/4-data-governance/static_data/cloud_functions/bytes_transferred/function/main.py [0:0]
def run(mode, project, dataset, table):
print('Running in {} mode'.format(mode))
physical_bytes_sum = 0
# get the region for the dataset where the destination table resides
location = bq_client.get_dataset(project + '.' + dataset).location
print('location:', location)
# are there any data transfer jobs that have written into this table
jobs_exist_sql = (' SELECT resource.labels.region,'
' SPLIT(SPLIT(jsonPayload.message, ",")[SAFE_ORDINAL(1)], ": ")[SAFE_ORDINAL(2)] AS project_dataset_table,'
' SUM(CAST(TRIM(SPLIT(SPLIT(jsonPayload.message, ",")[SAFE_ORDINAL(2)], ": ")[SAFE_ORDINAL(2)]) AS INT64)) as total_bytes_inserted'
' from `{}`'
' where date(timestamp) >= {}'
' and jsonPayload.message like "Table: %{}%" '
' GROUP BY 1, 2'.format(DATA_TRANSFER_LOG_TABLE, LAST_30_DAYS, table))
log_rows = list(bq_client.query(jobs_exist_sql).result())
if len(log_rows) == 0:
print('Table {} has no data inserted into'.format(table))
return 0
# data has been inserted into table
for log_row in log_rows:
src_region = list(log_row)[0]
# inserts within same region -> no egress
if src_region == location:
print('Inserts within same region, no egress charges')
else:
# cross-region inserts, use input bytes of table to estimate egress
phys_bytes_sql = ('select total_input_bytes from `{}`.`region-{}`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT '
'where dataset_id = "{}" and table_id = "{}" and date(start_timestamp) >= {}').format(project, src_region, dataset, table, LAST_30_DAYS)
tbl_rows = list(bq_client.query(phys_bytes_sql).result())
if len(tbl_rows) == 0:
print('Error: missing physical bytes')
return -1
for tbl_row in tbl_rows:
physical_bytes_sum += list(tbl_row)[0]
print('{} bytes were transferred'.format(physical_bytes_sum))
if mode == 'bytes':
return physical_bytes_sum
if mode == 'cost':
return calculate_egress(location, src_region, physical_bytes_sum)
else:
print('Error: invalid mode')
return -1