backend/bms_app/scheduled_tasks/services.py (55 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from urllib.parse import urljoin
from marshmallow import ValidationError
from bms_app import settings
from bms_app.models import ScheduledTask, SourceDB, db
from bms_app.services.cloud_run import get_cloud_run_service_object
from bms_app.services.gcloud_tasks import create_task
from bms_app.services.operations.utils import is_restore_allowed
from bms_app.services.scheduled_tasks import get_planned_task
def get_webhook_url(domain_name, scheduled_task):
return urljoin(
domain_name,
f'/api/scheduled-tasks/{scheduled_task.id}/run'
)
def create_google_task(scheduled_task):
"""Create Google Cloud task."""
domain_name = getattr(settings, 'GCP_LB_URL', None)
oauth_client_id = getattr(settings, 'GCP_OAUTH_CLIENT_ID', None)
kwargs = {}
# if we have specified domain name and Oauth CloudId
# it means that IAP is configued
# so CloudTasks should make request with authentication
if domain_name and oauth_client_id:
url = get_webhook_url(domain_name, scheduled_task)
kwargs.update({
'service_account_email': settings.GCP_SERVICE_ACCOUNT,
'audience': oauth_client_id,
})
else:
cloud_run_service_obj = get_cloud_run_service_object(
settings.GCP_CLOUD_RUN_SERVICE_NAME
)
url = get_webhook_url(cloud_run_service_obj.uri, scheduled_task)
google_task = create_task(
settings.GCP_CLOUD_TASKS_QUEUE,
scheduled_task.schedule_time,
url,
**kwargs
)
return google_task
def validate_source_db(db_id):
"""Check whether new task can be created."""
source_db = SourceDB.query.get(db_id)
if not source_db:
raise ValidationError({'db_id': ['incorrect db_id']})
# only one incomplete task is allowed
if get_planned_task(db_id):
raise ValidationError(
{'db_id': ['The database already has a scheduled task']}
)
# db should be in a status allowed to restore
if not is_restore_allowed(source_db):
raise ValidationError(
{'db_id': [f'wrong db status: {source_db.status}']}
)
def add_record_to_db(db_id, schedule_time):
scheduled_task = ScheduledTask(
schedule_time=schedule_time,
db_id=db_id,
)
db.session.add(scheduled_task)
db.session.flush()
return scheduled_task