in google-datacatalog-hive-connector/src/google/datacatalog_connectors/hive/scrape/metadata_database_scraper.py [0:0]
def get_database_metadata(self):
try:
databases = []
paginated_query_conf = {
'execute': True,
'rows_per_page': self.DATABASES_PER_PAGE,
'page_number': self.INITIAL_PAGE_NUMBER
}
# Since we can have Hive databases with thousands of tables,
# we add pagination logic to avoid holding the session for
# too long.
# Pagination is done at the top level: the databases.
while paginated_query_conf['execute']:
# Use context manager to make sure session is removed.
with self.session_scope() as session:
logging.info('[Scrape] fetching page: %s.',
paginated_query_conf['page_number'])
rows_per_page = paginated_query_conf['rows_per_page']
# Use subqueryload to eagerly execute
# the queries in the same session.
query = session.query(entities.Database).options(
subqueryload(entities.Database.tables).subqueryload(
entities.Table.table_params),
subqueryload(entities.Database.tables).subqueryload(
entities.Table.table_storages).subqueryload(
entities.TableStorage.columns))
# Add pagination clause
query = query.limit(rows_per_page).offset(
(paginated_query_conf['page_number'] - 1) *
rows_per_page)
results = query.all()
databases.extend(results)
# Set next page
paginated_query_conf['page_number'] = paginated_query_conf[
'page_number'] + 1
# It means there are no more pages.
if len(results) == 0:
logging.info(
'[Scrape] finished execution at page: %s.',
paginated_query_conf['page_number'])
paginated_query_conf['execute'] = False
return {'databases': databases}
except exc.OperationalError:
logging.error('Unable to connect to the metadata database.')
raise
finally:
# Make sure we have closed all connections of the connection pool.
self.__engine.dispose()