in services/jobs/fetch_resources/transfer_controller.py [0:0]
def __init__(self, app_config: dict):
"""
Initializes the TransferController with the specified project.
"""
self.project = app_config["project_name"]
self.location = app_config["service_location"]
self.handler_name = app_config["handler_name"]
self.queue_name = app_config["queue"]
self.api_client = DatacatalogApiAdapter()
self.big_query_client = BigQueryAdapter(app_config)
self.cloud_task_client = CloudTaskPublisher(
self.project,
self.location,
self.queue_name
)