in retail/interactive-tutorials/product/import_products_big_query_table.py [0:0]
def main(project_id, dataset_id, table_id):
# TODO: Set project_id to your Google Cloud Platform project ID.
# project_id = "my-project"
# TODO: Set dataset_id
# dataset_id = "products"
# TODO: Set table_id
# table_id = "products"
# Import products into a catalog from big query table using Retail API
import time
from google.cloud.retail import (
BigQuerySource,
ImportProductsRequest,
ProductInputConfig,
ProductServiceClient,
)
default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog/branches/default_branch"
# TO CHECK ERROR HANDLING USE THE TABLE WITH INVALID PRODUCTS:
# table_id = "products_some_invalid"
# get import products from big query request
def get_import_products_big_query_request(reconciliation_mode):
# TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
# default_catalog = "invalid_catalog_name"
big_query_source = BigQuerySource()
big_query_source.project_id = project_id
big_query_source.dataset_id = dataset_id
big_query_source.table_id = table_id
big_query_source.data_schema = "product"
input_config = ProductInputConfig()
input_config.big_query_source = big_query_source
import_request = ImportProductsRequest()
import_request.parent = default_catalog
import_request.reconciliation_mode = reconciliation_mode
import_request.input_config = input_config
print("---import products from big query table request---")
print(import_request)
return import_request
# call the Retail API to import products
def import_products_from_big_query():
# TRY THE FULL RECONCILIATION MODE HERE:
reconciliation_mode = ImportProductsRequest.ReconciliationMode.INCREMENTAL
import_big_query_request = get_import_products_big_query_request(
reconciliation_mode
)
big_query_operation = ProductServiceClient().import_products(
import_big_query_request
)
print("---the operation was started:----")
print(big_query_operation.operation.name)
while not big_query_operation.done():
print("---please wait till operation is done---")
time.sleep(30)
print("---import products operation is done---")
if big_query_operation.metadata is not None:
print("---number of successfully imported products---")
print(big_query_operation.metadata.success_count)
print("---number of failures during the importing---")
print(big_query_operation.metadata.failure_count)
else:
print("---operation.metadata is empty---")
if big_query_operation.result is not None:
print("---operation result:---")
print(big_query_operation.result())
else:
print("---operation.result is empty---")
import_products_from_big_query()