def run()

in 5-app-infra/4-data-governance/static_data/cloud_functions/bytes_transferred/function/main.py [0:0]


def run(mode, project, dataset, table):

    print('Running in {} mode'.format(mode))

    physical_bytes_sum = 0

    # get the region for the dataset where the destination table resides
    location = bq_client.get_dataset(project + '.' + dataset).location
    print('location:', location)

    # are there any data transfer jobs that have written into this table
    jobs_exist_sql = (' SELECT resource.labels.region,'
                      ' SPLIT(SPLIT(jsonPayload.message, ",")[SAFE_ORDINAL(1)], ": ")[SAFE_ORDINAL(2)] AS project_dataset_table,'
                      ' SUM(CAST(TRIM(SPLIT(SPLIT(jsonPayload.message, ",")[SAFE_ORDINAL(2)], ": ")[SAFE_ORDINAL(2)]) AS INT64)) as total_bytes_inserted'
                      ' from `{}`'
                      ' where date(timestamp) >= {}'
                      ' and jsonPayload.message like "Table: %{}%" '
                      ' GROUP BY 1, 2'.format(DATA_TRANSFER_LOG_TABLE, LAST_30_DAYS, table))
    
    log_rows = list(bq_client.query(jobs_exist_sql).result())

    if len(log_rows) == 0:
        print('Table {} has no data inserted into'.format(table))
        return 0
    
    # data has been inserted into table
    for log_row in log_rows:
        src_region = list(log_row)[0]

        # inserts within same region -> no egress
        if src_region == location:
            print('Inserts within same region, no egress charges')
        else:
            # cross-region inserts, use input bytes of table to estimate egress
            phys_bytes_sql = ('select total_input_bytes from `{}`.`region-{}`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT '
                     'where dataset_id = "{}" and table_id = "{}" and date(start_timestamp) >= {}').format(project, src_region, dataset, table, LAST_30_DAYS)
    
            tbl_rows = list(bq_client.query(phys_bytes_sql).result())
            if len(tbl_rows) == 0:
                print('Error: missing physical bytes')
                return -1
            
            for tbl_row in tbl_rows:
                physical_bytes_sum += list(tbl_row)[0]

            print('{} bytes were transferred'.format(physical_bytes_sum))

    if mode == 'bytes':
        return physical_bytes_sum
    if mode == 'cost':
        return calculate_egress(location, src_region, physical_bytes_sum)
    else:
        print('Error: invalid mode')
        return -1