in backend/bms_app/scheduled_tasks/services.py [0:0]
def create_google_task(scheduled_task):
"""Create Google Cloud task."""
domain_name = getattr(settings, 'GCP_LB_URL', None)
oauth_client_id = getattr(settings, 'GCP_OAUTH_CLIENT_ID', None)
kwargs = {}
# if we have specified domain name and Oauth CloudId
# it means that IAP is configued
# so CloudTasks should make request with authentication
if domain_name and oauth_client_id:
url = get_webhook_url(domain_name, scheduled_task)
kwargs.update({
'service_account_email': settings.GCP_SERVICE_ACCOUNT,
'audience': oauth_client_id,
})
else:
cloud_run_service_obj = get_cloud_run_service_object(
settings.GCP_CLOUD_RUN_SERVICE_NAME
)
url = get_webhook_url(cloud_run_service_obj.uri, scheduled_task)
google_task = create_task(
settings.GCP_CLOUD_TASKS_QUEUE,
scheduled_task.schedule_time,
url,
**kwargs
)
return google_task