scripts/copy_bigquery_data.py (114 lines of code) (raw):

import sys from typing import Union import pandas as pd def get_project_id(): import urllib.request url = "http://metadata.google.internal/computeMetadata/v1/project/project-id" req = urllib.request.Request(url) req.add_header("Metadata-Flavor", "Google") project_id = urllib.request.urlopen(req).read().decode() if not project_id: try: #try to retrieve PROJECT ID from config.json in gs://{PROJECT}/datagen/config.json config = get_local_config() project_id = config["project_id"] except: try: import subprocess project_id=subprocess.check_output(["gcloud config get-value project"], shell=True).decode("utf-8").replace("\n","") except: raise ValueError("Could not get a value for PROJECT_ID") return project_id def run_bq_query(sql: str) -> Union[str, pd.DataFrame]: """ Input: SQL query, as a string, to execute in BigQuery Returns the query results as a pandas DataFrame, or error, if any """ from google.cloud import bigquery bq_client = bigquery.Client() # Try dry run before executing query to catch any errors job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) bq_client.query(sql, job_config=job_config) # If dry run succeeds without errors, proceed to run query job_config = bigquery.QueryJobConfig() client_result = bq_client.query(sql, job_config=job_config) job_id = client_result.job_id # Wait for query/job to finish running. then get & return data frame df = client_result.result().to_arrow().to_pandas() return df def copy_blob( bucket_name, blob_name, destination_bucket_name, destination_blob_name ): """Copies a blob from one bucket to another with a new name.""" # bucket_name = "your-bucket-name" # blob_name = "your-object-name" # destination_bucket_name = "destination-bucket-name" # destination_blob_name = "destination-object-name" from google.cloud import storage storage_client = storage.Client() source_bucket = storage_client.bucket(bucket_name) source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) blob_copy = source_bucket.copy_blob( source_blob, destination_bucket, destination_blob_name ) if destination_bucket_name == "cymbal-fraudfinder": # make file public only if this script is being run within the cymbal-fraudfinder project blob_copy.make_public() print(f"File copied from gs://{source_bucket.name}/{source_blob.name} \n\t\t to gs://{destination_bucket.name}/{blob_copy.name}") def get_batch_data_gcs(BUCKET_NAME): ''' Copy necessary files for datagen streaming ''' copy_blob( bucket_name="cymbal-fraudfinder", blob_name="datagen/hacked_customers_history.txt", destination_bucket_name=BUCKET_NAME, destination_blob_name="datagen/hacked_customers_history.txt" ) copy_blob( bucket_name="cymbal-fraudfinder", blob_name="datagen/hacked_terminals_history.txt", destination_bucket_name=BUCKET_NAME, destination_blob_name="datagen/hacked_terminals_history.txt" ) copy_blob( bucket_name="cymbal-fraudfinder", blob_name="datagen/demographics/customer_profiles.csv", destination_bucket_name=BUCKET_NAME, destination_blob_name="datagen/demographics/customer_profiles.csv" ) copy_blob( bucket_name="cymbal-fraudfinder", blob_name="datagen/demographics/terminal_profiles.csv", destination_bucket_name=BUCKET_NAME, destination_blob_name="datagen/demographics/terminal_profiles.csv" ) copy_blob( bucket_name="cymbal-fraudfinder", blob_name="datagen/demographics/customer_with_terminal_profiles.csv", destination_bucket_name=BUCKET_NAME, destination_blob_name="datagen/demographics/customer_with_terminal_profiles.csv" ) return "Done get_batch_data_gcs" def get_batch_data_bq(PROJECT): ''' Creates the following tables in your project by copying from public tables: {YOUR PROJECT} |-`tx` (dataset) |-`tx` (table: transactions without labels) |-`txlabels` (table: transactions with fraud labels (1 or 0)) |-demographics |-`customers` (table: profiles of customers) |-`terminals` (table: profiles of terminals) |-`customersterminals` (table: profiles of customers and terminals within their radius) ''' run_bq_query(f"CREATE SCHEMA IF NOT EXISTS `{PROJECT}`.tx OPTIONS(location='us-central1');") run_bq_query(f"CREATE SCHEMA IF NOT EXISTS `{PROJECT}`.demographics OPTIONS(location='us-central1');") run_bq_query(f""" CREATE OR REPLACE TABLE `{PROJECT}`.tx.tx PARTITION BY DATE(TX_TS) AS ( SELECT TX_ID, TX_TS, CUSTOMER_ID, TERMINAL_ID, TX_AMOUNT FROM `cymbal-fraudfinder`.txbackup.all ); """) print(f"BigQuery table created: `{PROJECT}`.tx.tx") run_bq_query(f""" CREATE OR REPLACE TABLE `{PROJECT}`.tx.txlabels AS ( SELECT TX_ID, TX_FRAUD FROM `cymbal-fraudfinder`.txbackup.all ); """) print(f"BigQuery table created: `{PROJECT}`.tx.txlabels") run_bq_query(f""" CREATE OR REPLACE TABLE `{PROJECT}`.demographics.customers AS ( SELECT * FROM `cymbal-fraudfinder`.demographics.customers ); """) print(f"BigQuery table created: `{PROJECT}`.demographics.customers") run_bq_query(f""" CREATE OR REPLACE TABLE `{PROJECT}`.demographics.terminals AS ( SELECT * FROM `cymbal-fraudfinder`.demographics.terminals ); """) print(f"BigQuery table created: `{PROJECT}`.demographics.terminals") run_bq_query(f""" CREATE OR REPLACE TABLE `{PROJECT}`.demographics.customersterminals AS ( SELECT * FROM `cymbal-fraudfinder`.demographics.customersterminals ); """) print(f"BigQuery table created: `{PROJECT}`.demographics.customersterminals") return "Done get_batch_data_bq" if __name__ == "__main__": PROJECT = get_project_id() BUCKET_NAME = sys.argv[1] get_batch_data_gcs(BUCKET_NAME) get_batch_data_bq(PROJECT)