retail/interactive-tutorials/events/setup_events/setup_cleanup.py (144 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import re import shlex import subprocess from google.api_core.exceptions import NotFound import google.auth from google.cloud import bigquery from google.cloud import storage from google.cloud.retail import ( ProductDetail, PurgeUserEventsRequest, UserEvent, UserEventServiceClient, WriteUserEventRequest, ) from google.cloud.retail_v2 import Product from google.protobuf.timestamp_pb2 import Timestamp project_id = google.auth.default()[1] default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog" # get user event def get_user_event(visitor_id): timestamp = Timestamp() timestamp.seconds = int(datetime.datetime.now().timestamp()) product = Product() product.id = "test_id" product_detail = ProductDetail() product_detail.product = product user_event = UserEvent() user_event.event_type = "detail-page-view" user_event.visitor_id = visitor_id user_event.event_time = timestamp user_event.product_details = [product_detail] print(user_event) return user_event # write user event def write_user_event(visitor_id): write_user_event_request = WriteUserEventRequest() write_user_event_request.user_event = get_user_event(visitor_id) write_user_event_request.parent = default_catalog user_event = UserEventServiceClient().write_user_event(write_user_event_request) print("---the user event is written---") print(user_event) return user_event # purge user event def purge_user_event(visitor_id): purge_user_event_request = PurgeUserEventsRequest() purge_user_event_request.filter = f'visitorId="{visitor_id}"' purge_user_event_request.parent = default_catalog purge_user_event_request.force = True purge_operation = UserEventServiceClient().purge_user_events( purge_user_event_request ) print("---the purge operation was started:----") print(purge_operation.operation.name) def get_project_id(): get_project_command = "gcloud config get-value project --format json" config = subprocess.check_output(shlex.split(get_project_command)) project_id = re.search('"(.*?)"', str(config)).group(1) return project_id def create_bucket(bucket_name: str): """Create a new bucket in Cloud Storage""" print("Creating new bucket:" + bucket_name) buckets_in_your_project = list_buckets() if bucket_name in buckets_in_your_project: print(f"Bucket {bucket_name} already exists") else: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) bucket.storage_class = "STANDARD" new_bucket = storage_client.create_bucket(bucket, location="us") print( f"Created bucket {new_bucket.name} in {new_bucket.location} with storage class {new_bucket.storage_class}" ) return new_bucket def delete_bucket(bucket_name: str): """Delete a bucket from Cloud Storage""" storage_client = storage.Client() print("Deleting bucket:" + bucket_name) buckets_in_your_project = list_buckets() if bucket_name in buckets_in_your_project: blobs = storage_client.list_blobs(bucket_name) for blob in blobs: blob.delete() bucket = storage_client.get_bucket(bucket_name) bucket.delete() print(f"Bucket {bucket.name} is deleted") else: print(f"Bucket {bucket_name} is not found") def list_buckets(): """Lists all buckets""" bucket_list = [] storage_client = storage.Client() buckets = storage_client.list_buckets() for bucket in buckets: bucket_list.append(bucket.name) return bucket_list def upload_blob(bucket_name, source_file_name): """Uploads a file to the bucket.""" # The path to your file to upload # source_file_name = "local/path/to/file" print(f"Uploading data from {source_file_name} to the bucket {bucket_name}") storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) object_name = re.search("resources/(.*?)$", source_file_name).group(1) blob = bucket.blob(object_name) blob.upload_from_filename(source_file_name) print(f"File {source_file_name} uploaded to {object_name}.") def create_bq_dataset(dataset_name): """Create a BigQuery dataset""" full_dataset_id = f"{project_id}.{dataset_name}" bq = bigquery.Client() print(f"Creating dataset {full_dataset_id}") try: bq.get_dataset(full_dataset_id) print(f"dataset {full_dataset_id} already exists") except NotFound: # Construct a Dataset object to send to the API. dataset = bigquery.Dataset(full_dataset_id) dataset.location = "US" bq.create_dataset(dataset) print("dataset is created") def create_bq_table(dataset, table_name, schema_file_path): """Create a BigQuery table""" full_table_id = f"{project_id}.{dataset}.{table_name}" bq = bigquery.Client() print(f"Check if BQ table {full_table_id} exists") try: bq.get_table(full_table_id) print(f"table {full_table_id} exists and will be deleted") delete_bq_table(dataset, table_name) except NotFound: print(f"table {full_table_id} does not exist") # Construct a Table object to send to the API. with open(schema_file_path, "rb") as schema: schema_dict = json.load(schema) table = bigquery.Table(full_table_id, schema=schema_dict) bq.create_table(table) print(f"table {full_table_id} is created") def delete_bq_table(dataset, table_name): full_table_id = f"{project_id}.{dataset}.{table_name}" bq = bigquery.Client() bq.delete_table(full_table_id, not_found_ok=True) print(f"Table '{full_table_id}' is deleted.") def upload_data_to_bq_table(dataset, table_name, source, schema_file_path): """Upload data to the table from specified source file""" full_table_id = f"{project_id}.{dataset}.{table_name}" bq = bigquery.Client() print(f"Uploading data from {source} to the table {full_table_id}") with open(schema_file_path, "rb") as schema: schema_dict = json.load(schema) job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, schema=schema_dict ) with open(source, "rb") as source_file: job = bq.load_table_from_file(source_file, full_table_id, job_config=job_config) job.result() # Waits for the job to complete. print("data was uploaded")