in services/jobs/fetch_projects/transfer_controller.py [0:0]
def create_cloud_tasks(self, projects: list[Project]):
"""
Create cloud tasks for further processing
"""
if not self.cloud_task_client.check_queue_exists():
self.cloud_task_client.create_queue()
today = datetime.datetime.today().isoformat()
for project in projects:
payload = project.to_dict()
payload["created_at"] = (
today # TODO: move "created_at to constants.py"
)
self.cloud_task_client.create_task(
payload,
self.handler_name,
self.project,
self.location,
)