in services/jobs/fetch_resources/transfer_controller.py [0:0]
def create_cloud_tasks(self, projects: list[str]):
"""
Create initial cloud tasks for cloud task handler
"""
if not self.cloud_task_client.check_queue_exists():
self.cloud_task_client.create_queue()
today = datetime.date.today()
for project in projects:
transferred_public_tag_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.TAG_TEMPLATE,
True,
today,
True
)
non_transferred_public_tag_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.TAG_TEMPLATE,
False,
today,
True
)
transferred_private_tag_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.TAG_TEMPLATE,
True,
today,
False
)
non_transferred_private_tag_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.TAG_TEMPLATE,
False,
today,
False
)
transferred_entry_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.ENTRY_GROUP,
True,
today
)
non_transferred_entry_payload = self.build_cloud_task_payload(
project,
DatacatalogApiAdapter.ResourceType.ENTRY_GROUP,
False,
today
)
self.cloud_task_client.create_task(
transferred_public_tag_payload,
self.handler_name,
self.project,
self.location
)
self.cloud_task_client.create_task(
transferred_private_tag_payload,
self.handler_name,
self.project,
self.location
)
self.cloud_task_client.create_task(
non_transferred_public_tag_payload,
self.handler_name,
self.project,
self.location
)
self.cloud_task_client.create_task(
non_transferred_private_tag_payload,
self.handler_name,
self.project,
self.location
)
self.cloud_task_client.create_task(
transferred_entry_payload,
self.handler_name,
self.project,
self.location
)
self.cloud_task_client.create_task(
non_transferred_entry_payload,
self.handler_name,
self.project,
self.location
)