appengine/flexible/tasks/snippets.py (132 lines of code) (raw):

# Copyright 2019 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.cloud import tasks def create_queue(project, location, queue_blue_name, queue_red_name): # [START cloud_tasks_taskqueues_using_yaml] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue_blue_name = 'queue-blue' # queue_red_name = 'queue-red' parent = f"projects/{project}/locations/{location}" queue_blue = { "name": client.queue_path(project, location, queue_blue_name), "rate_limits": {"max_dispatches_per_second": 5}, "app_engine_routing_override": {"version": "v2", "service": "task-module"}, } queue_red = { "name": client.queue_path(project, location, queue_red_name), "rate_limits": {"max_dispatches_per_second": 1}, } queues = [queue_blue, queue_red] for queue in queues: response = client.create_queue(parent=parent, queue=queue) print(response) # [END cloud_tasks_taskqueues_using_yaml] return response def update_queue(project, location, queue): # [START cloud_tasks_taskqueues_processing_rate] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'queue-blue' # Get queue object queue_path = client.queue_path(project, location, queue) queue = client.get_queue(name=queue_path) # Update queue object queue.rate_limits.max_dispatches_per_second = 20 queue.rate_limits.max_concurrent_dispatches = 10 response = client.update_queue(queue=queue) print(response) # [END cloud_tasks_taskqueues_processing_rate] return response def create_task(project, location, queue): # [START cloud_tasks_taskqueues_new_task] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'default' amount = 10 parent = client.queue_path(project, location, queue) task = { "app_engine_http_request": { "http_method": tasks.HttpMethod.POST, "relative_uri": "/update_counter", "app_engine_routing": {"service": "worker"}, "body": str(amount).encode(), } } response = client.create_task(parent=parent, task=task) eta = response.schedule_time.strftime("%m/%d/%Y, %H:%M:%S") print(f"Task {response.name} enqueued, ETA {eta}.") # [END cloud_tasks_taskqueues_new_task] return response def create_tasks_with_data(project, location, queue): # [START cloud_tasks_taskqueues_passing_data] import json client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'default' parent = client.queue_path(project, location, queue) task1 = { "app_engine_http_request": { "http_method": tasks.HttpMethod.POST, "relative_uri": "/update_counter?key=blue", "app_engine_routing": {"service": "worker"}, } } task2 = { "app_engine_http_request": { "http_method": tasks.HttpMethod.POST, "relative_uri": "/update_counter", "app_engine_routing": {"service": "worker"}, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"key": "blue"}).encode(), } } response = client.create_task(parent=parent, task=task1) print(response) response = client.create_task(parent=parent, task=task2) print(response) # [END cloud_tasks_taskqueues_passing_data] return response def create_task_with_name(project, location, queue, task_name): # [START cloud_tasks_taskqueues_naming_tasks] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'default' # task_name = 'first-try' parent = client.queue_path(project, location, queue) task = { "name": client.task_path(project, location, queue, task_name), "app_engine_http_request": { "http_method": tasks.HttpMethod.GET, "relative_uri": "/url/path", }, } response = client.create_task(parent=parent, task=task) print(response) # [END cloud_tasks_taskqueues_naming_tasks] return response def delete_task(project, location, queue): # [START cloud_tasks_taskqueues_deleting_tasks] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'queue1' task_path = client.task_path(project, location, queue, "foo") response = client.delete_task(name=task_path) # [END cloud_tasks_taskqueues_deleting_tasks] return response def purge_queue(project, location, queue): # [START cloud_tasks_taskqueues_purging_tasks] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'queue1' queue_path = client.queue_path(project, location, queue) response = client.purge_queue(name=queue_path) # [END cloud_tasks_taskqueues_purging_tasks] return response def pause_queue(project, location, queue): # [START cloud_tasks_taskqueues_pause_queue] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'queue1' queue_path = client.queue_path(project, location, queue) response = client.pause_queue(name=queue_path) # [END cloud_tasks_taskqueues_pause_queue] return response def delete_queue(project, location, queue): # [START cloud_tasks_taskqueues_deleting_queues] client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # queue = 'queue1' queue_path = client.queue_path(project, location, queue) response = client.delete_queue(name=queue_path) # [END cloud_tasks_taskqueues_deleting_queues] return response def retry_task(project, location, fooqueue, barqueue, bazqueue): # [START cloud_tasks_taskqueues_retrying_tasks] from google.protobuf import duration_pb2 client = tasks.CloudTasksClient() # TODO(developer): Uncomment these lines and replace with your values. # project = 'my-project-id' # location = 'us- central1' # fooqueue = 'fooqueue' # barqueue = 'barqueue' # bazqueue = 'bazqueue' parent = f"projects/{project}/locations/{location}" max_retry = duration_pb2.Duration() max_retry.seconds = 2 * 60 * 60 * 24 foo = { "name": client.queue_path(project, location, fooqueue), "rate_limits": {"max_dispatches_per_second": 1}, "retry_config": {"max_attempts": 7, "max_retry_duration": max_retry}, } min = duration_pb2.Duration() min.seconds = 10 max = duration_pb2.Duration() max.seconds = 200 bar = { "name": client.queue_path(project, location, barqueue), "rate_limits": {"max_dispatches_per_second": 1}, "retry_config": {"min_backoff": min, "max_backoff": max, "max_doublings": 0}, } max.seconds = 300 baz = { "name": client.queue_path(project, location, bazqueue), "rate_limits": {"max_dispatches_per_second": 1}, "retry_config": {"min_backoff": min, "max_backoff": max, "max_doublings": 3}, } queues = [foo, bar, baz] for queue in queues: response = client.create_queue(parent=parent, queue=queue) print(response) # [END cloud_tasks_taskqueues_retrying_tasks] return response