def main()

in retail/interactive-tutorials/events/import_user_events_big_query.py [0:0]


def main(project_id, dataset_id, table_id):
    # TODO: Set project_id to your Google Cloud Platform project ID.
    # project_id = "my-project"

    # TODO: Set dataset_id
    # dataset_id = "user_events"

    # TODO: Set table_id
    # table_id = "events"

    # Import products into a catalog from big query table using Retail API
    import time

    from google.cloud.retail import (
        BigQuerySource,
        ImportUserEventsRequest,
        UserEventInputConfig,
        UserEventServiceClient,
    )

    default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog"

    # TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS:
    # table_id = "events_some_invalid"

    # get import user events from big query request
    def get_import_events_big_query_request():
        # TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
        # default_catalog = "invalid_catalog_name"
        big_query_source = BigQuerySource()
        big_query_source.project_id = project_id
        big_query_source.dataset_id = dataset_id
        big_query_source.table_id = table_id
        big_query_source.data_schema = "user_event"

        input_config = UserEventInputConfig()
        input_config.big_query_source = big_query_source

        import_request = ImportUserEventsRequest()
        import_request.parent = default_catalog
        import_request.input_config = input_config

        print("---import user events from BigQuery source request---")
        print(import_request)

        return import_request

    # call the Retail API to import user events
    def import_user_events_from_big_query():
        import_big_query_request = get_import_events_big_query_request()
        big_query_operation = UserEventServiceClient().import_user_events(
            import_big_query_request
        )

        print("---the operation was started:----")
        print(big_query_operation.operation.name)

        while not big_query_operation.done():
            print("---please wait till operation is done---")
            time.sleep(30)
        print("---import user events operation is done---")

        if big_query_operation.metadata is not None:
            print("---number of successfully imported events---")
            print(big_query_operation.metadata.success_count)
            print("---number of failures during the importing---")
            print(big_query_operation.metadata.failure_count)
        else:
            print("---operation.metadata is empty---")

        if big_query_operation.result is not None:
            print("---operation result:---")
            print(big_query_operation.result())
        else:
            print("---operation.result is empty---")

    import_user_events_from_big_query()