in retail/interactive-tutorials/product/import_products_gcs.py [0:0]
def main(bucket_name):
# Import products from using Retail API.
#
import time
import google.auth
from google.cloud.retail import (
GcsSource,
ImportErrorsConfig,
ImportProductsRequest,
ProductInputConfig,
ProductServiceClient,
)
project_id = google.auth.default()[1]
# TODO: Developer set the bucket_name
# bucket_name = os.environ["BUCKET_NAME"]
# You can change the branch here. The "default_branch" is set to point to the branch "0"
default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog/branches/default_branch"
gcs_bucket = f"gs://{bucket_name}"
gcs_errors_bucket = f"{gcs_bucket}/error"
gcs_products_object = "products.json"
# TO CHECK ERROR HANDLING USE THE JSON WITH INVALID PRODUCT
# gcs_products_object = "products_some_invalid.json"
def get_import_products_gcs_request(gcs_object_name: str):
# TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
# default_catalog = "invalid_catalog_name"
gcs_source = GcsSource()
gcs_source.input_uris = [f"{gcs_bucket}/{gcs_object_name}"]
input_config = ProductInputConfig()
input_config.gcs_source = gcs_source
print("GRS source:")
print(gcs_source.input_uris)
errors_config = ImportErrorsConfig()
errors_config.gcs_prefix = gcs_errors_bucket
import_request = ImportProductsRequest()
import_request.parent = default_catalog
import_request.reconciliation_mode = (
ImportProductsRequest.ReconciliationMode.INCREMENTAL
)
import_request.input_config = input_config
import_request.errors_config = errors_config
print("---import products from google cloud source request---")
print(import_request)
return import_request
# call the Retail API to import products
def import_products_from_gcs():
import_gcs_request = get_import_products_gcs_request(gcs_products_object)
gcs_operation = ProductServiceClient().import_products(import_gcs_request)
print("---the operation was started:----")
print(gcs_operation.operation.name)
while not gcs_operation.done():
print("---please wait till operation is done---")
time.sleep(30)
print("---import products operation is done---")
if gcs_operation.metadata is not None:
print("---number of successfully imported products---")
print(gcs_operation.metadata.success_count)
print("---number of failures during the importing---")
print(gcs_operation.metadata.failure_count)
else:
print("---operation.metadata is empty---")
if gcs_operation.result is not None:
print("---operation result:---")
print(gcs_operation.result())
else:
print("---operation.result is empty---")
# The imported products needs to be indexed in the catalog before they become available for search.
print(
"Wait 2-5 minutes till products become indexed in the catalog, after that they will be available for search"
)
import_products_from_gcs()