in google-datacatalog-hive-connector/src/google/datacatalog_connectors/hive/sync/datacatalog_synchronizer.py [0:0]
def run(self):
logging.info('\n==============Start hive-to-datacatalog============')
logging.info('\n\n==============Scrape metadata===============')
if self.__metadata_sync_event:
databases_metadata = \
scrape.MetadataSyncEventScraper.get_database_metadata(
self.__metadata_sync_event)
sync_event = entities.SyncEvent[
self.__metadata_sync_event['event']]
# If we have a metadata_sync_event then we
# use the host_name that created the event
host_name = self.__metadata_sync_event['hostName']
else:
host_name = self.__hive_metastore_db_host
metadata_database_scraper = scrape.MetadataDatabaseScraper(
host_name, self.__hive_metastore_db_user,
self.__hive_metastore_db_pass, self.__hive_metastore_db_name,
self.__hive_metastore_db_type)
databases_metadata = \
metadata_database_scraper.get_database_metadata()
sync_event = entities.SyncEvent.MANUAL_DATABASE_SYNC
logging.info('\n--> {}'.format(sync_event))
logging.info('\n{}'.format(len(databases_metadata['databases'])) +
' databases ready to be ingested...')
logging.info('\n\n==============Prepare metadata===============')
# Prepare.
logging.info('\nPreparing the metadata...')
factory = assembled_entry_factory.AssembledEntryFactory(
self.__project_id, self.__location_id, host_name,
self.__entry_group_id)
prepared_entries = factory.make_entries_from_database_metadata(
databases_metadata)
self.__log_metadata(databases_metadata)
self.__log_entries(prepared_entries)
logging.info('\n==============Ingest metadata===============')
cleaner = datacatalog_metadata_cleaner.DataCatalogMetadataCleaner(
self.__project_id, self.__location_id, self.__entry_group_id)
# Since we can't rely on search returning the ingested entries,
# we clean up the obsolete entries before ingesting.
if sync_event == entities.SyncEvent.MANUAL_DATABASE_SYNC:
assembled_entries_data = []
for database_entry, table_related_entries in prepared_entries:
assembled_entries_data.extend(
[database_entry, *table_related_entries])
cleaner.delete_obsolete_metadata(
assembled_entries_data,
'system={}'.format(self.__entry_group_id))
del assembled_entries_data
# Ingest.
logging.info('\nStarting to ingest custom metadata...')
if sync_event not in self.__CLEAN_UP_EVENTS:
self.__ingest_created_or_updated(prepared_entries)
elif sync_event == entities.SyncEvent.DROP_DATABASE:
self.__cleanup_deleted_databases(cleaner, prepared_entries)
elif sync_event == entities.SyncEvent.DROP_TABLE:
self.__cleanup_deleted_tables(cleaner, prepared_entries)
logging.info('\n==============End hive-to-datacatalog===============')
self.__after_run()
return self.__task_id