bigquery_etl/util/client_queue.py (34 lines of code) (raw):

"""Queue for balancing jobs across billing projects.""" import asyncio from contextlib import contextmanager from queue import Queue from google.cloud import bigquery from requests import adapters class ClientQueue: """Queue for balancing jobs across billing projects. Also provides default_client for use in operations that do not need to be distributed across projects and that may be in excess of parallelism, such as copying results from a subset of queries before all queries have finished. """ def __init__(self, billing_projects, parallelism, connection_pool_max_size=None): """Initialize. connection_pool_max_size sets the pool size in the HTTP adapter of each client in the queue. This allows more concurrent requests when a client is shared across threads. See https://cloud.google.com/bigquery/docs/python-libraries#troubleshooting_connection_pool_errors Increasing connection_pool_max_size will also increase memory usage. """ clients = [bigquery.Client(project) for project in billing_projects] if connection_pool_max_size is not None: for client in clients: adapter = adapters.HTTPAdapter( pool_connections=connection_pool_max_size, pool_maxsize=connection_pool_max_size, ) client._http.mount("https://", adapter) client._http._auth_request.session.mount("https://", adapter) self.default_client = clients[0] self._q = Queue(parallelism) for i in range(parallelism): self._q.put(clients[i % len(clients)]) @contextmanager def client(self): """Context manager for using a client from the queue.""" client = self._q.get_nowait() try: yield client finally: self._q.put_nowait(client) def with_client(self, func, *args): """Run func with a client from the queue.""" with self.client() as client: return func(client, *args) async def async_with_client(self, executor, func, *args): """Run func asynchronously in executor.""" return await asyncio.get_running_loop().run_in_executor( executor, self.with_client, func, *args )