def main()

in retail/interactive-tutorials/events/import_user_events_gcs.py [0:0]


def main(bucket_name):
    # Import user events into a catalog from GCS using Retail API

    import time

    import google.auth
    from google.cloud.retail import (
        GcsSource,
        ImportErrorsConfig,
        ImportUserEventsRequest,
        UserEventInputConfig,
        UserEventServiceClient,
    )

    # Read the project number from the environment variable
    project_id = google.auth.default()[1]

    # Read bucket name from the environment variable
    bucket_name = os.getenv("EVENTS_BUCKET_NAME")

    # TODO: Developer set the bucket_name
    # bucket_name = 'user_events_bucket'

    default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog"

    gcs_bucket = f"gs://{bucket_name}"
    gcs_errors_bucket = f"{gcs_bucket}/error"
    gcs_events_object = "user_events.json"

    # TO CHECK ERROR HANDLING USE THE JSON WITH INVALID PRODUCT
    # gcs_events_object = "user_events_some_invalid.json"

    # get import user events from gcs request
    def get_import_events_gcs_request(gcs_object_name: str):
        # TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
        # default_catalog = "invalid_catalog_name"
        gcs_source = GcsSource()
        gcs_source.input_uris = [f"{gcs_bucket}/{gcs_object_name}"]

        input_config = UserEventInputConfig()
        input_config.gcs_source = gcs_source

        errors_config = ImportErrorsConfig()
        errors_config.gcs_prefix = gcs_errors_bucket

        import_request = ImportUserEventsRequest()
        import_request.parent = default_catalog
        import_request.input_config = input_config
        import_request.errors_config = errors_config

        print("---import user events from google cloud source request---")
        print(import_request)

        return import_request

    # call the Retail API to import user events
    def import_user_events_from_gcs():
        import_gcs_request = get_import_events_gcs_request(gcs_events_object)
        gcs_operation = UserEventServiceClient().import_user_events(import_gcs_request)

        print("---the operation was started:----")
        print(gcs_operation.operation.name)

        while not gcs_operation.done():
            print("---please wait till operation is done---")
            time.sleep(30)

        print("---import user events operation is done---")

        if gcs_operation.metadata is not None:
            print("---number of successfully imported events---")
            print(gcs_operation.metadata.success_count)
            print("---number of failures during the importing---")
            print(gcs_operation.metadata.failure_count)
        else:
            print("---operation.metadata is empty---")

        if gcs_operation.result is not None:
            print("---operation result:---")
            print(gcs_operation.result())
        else:
            print("---operation.result is empty---")

    import_user_events_from_gcs()