in backend/bms_app/services/gcloud_tasks.py [0:0]
def create_task(queue_path, schedule_dt, url,
service_account_email=None, audience=None):
"""Create Google Task (HTTP Target).
Params:
- queue_path: full GoogleTasks queue name.
- schedule_dt: datetime in the future when it should be executed.
- url: url that will be requested with GET method.
"""
# Create a client.
client = tasks_v2.CloudTasksClient()
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(schedule_dt)
# Construct the request body.
task = {
'http_request': { # Specify the type of request.
'http_method': tasks_v2.HttpMethod.POST,
'url': url, # The full url path that the task will be sent to.
},
'schedule_time': timestamp,
}
if service_account_email and audience:
task['http_request']['oidc_token'] = {
'service_account_email': service_account_email,
'audience': audience
}
# Use the client to build and send the task.
response = client.create_task(
request={
'parent': queue_path,
'task': task
}
)
return response