tools/sts-job-manager/sts_job_manager.py (426 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ The STS Job Manager. This tool creates STS Jobs and records each job's state. """ import argparse import json import logging import os import time from datetime import datetime from typing import Dict, List, Optional from google.cloud import bigquery, monitoring_v3 from constants import schemas from constants.status import (KNOWN_STATUSES, STATUS, sts_operation_status_to_table_status) from lib.options import STSJobManagerOptions from lib.services import Services from lib.table_util import get_table_identifier, get_table_ref logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper()) class Job: def __init__(self, data): self.prefix: str = data.prefix self.status: str = data.status self.job_name: str = data.job_name self.last_updated: datetime = data.last_updated self.operation_results: Optional[dict] = getattr( data, 'operation_results', None) def run_query(query: str, params: Optional[List], services: Services, options: STSJobManagerOptions): """ Runs a given query with optional params. """ job_config = bigquery.QueryJobConfig() job_config.query_parameters = params if params else [] return services.bigquery.query( query, location=options.bigquery_options.dataset_location, job_config=job_config ) def get_jobs_by_prefix(services: Services, options: STSJobManagerOptions) \ -> Dict[str, Job]: """ Retrieves jobs from the database and returns them in a key-value format where the `key` is the prefix and the value is a `Job` object. """ table = get_table_identifier( services, options.bigquery_options, options.bigquery_options.table_name['job']) # API does not support table names for preparameterized queries # https://cloud.google.com/bigquery/docs/parameterized-queries query = f""" SELECT * FROM `{table}` """ # nosec results = run_query(query, None, services, options) prefixToStatus: Dict[str, Job] = {} for row in results: prefixToStatus[row.prefix] = Job(row) return prefixToStatus def set_prefixes_to_status(prefixes: List[str], status: str, services: Services, options: STSJobManagerOptions): """ Sets a list of prefixes to a given status in the database. """ logger.info(f'Updating {len(prefixes)} prefixes to `{status}` status') table = get_table_identifier( services, options.bigquery_options, options.bigquery_options.table_name['job']) # API does not support table names for preparameterized queries # https://cloud.google.com/bigquery/docs/parameterized-queries query = f""" UPDATE `{table}` SET status = @status, last_updated = CURRENT_TIMESTAMP() WHERE prefix IN UNNEST(@prefixes) """ params = [ bigquery.ScalarQueryParameter("status", "STRING", status), bigquery.ArrayQueryParameter("prefixes", "STRING", prefixes) ] run_query(query, params, services, options).result() def set_job_name(prefix: str, job_name: str, services: Services, options: STSJobManagerOptions): """ Set's a prefix's transfer operation job name in the database. """ logger.info( f'Updating the prefix `{prefix}` with job name `{job_name}`...') table = get_table_identifier( services, options.bigquery_options, options.bigquery_options.table_name['job']) # API does not support table names for preparameterized queries # https://cloud.google.com/bigquery/docs/parameterized-queries query = f""" UPDATE `{table}` SET job_name = @job_name, last_updated = CURRENT_TIMESTAMP() WHERE prefix = @prefix """ params = [ bigquery.ScalarQueryParameter("prefix", "STRING", prefix), bigquery.ScalarQueryParameter("job_name", "STRING", job_name) ] run_query(query, params, services, options).result() logger.info( f'...updated the prefix `{prefix}` with job name `{job_name}`.') def insert_history(rows: List[object], services: Services, options: STSJobManagerOptions): """ Inserts a list of rows into the job history table. Each object provided in the list matches the `JOB_HISTORY` schema """ logger.info(f'Inserting {len(rows)} row(s) into the history table') table_ref = get_table_ref( services.bigquery, options.bigquery_options, options.bigquery_options.table_name['job_history']) errors = services.bigquery.insert_rows( table_ref, rows, selected_fields=schemas.JOB_HISTORY) if errors: logger.error('errors were found:') for row in errors: logger.error(row) raise Exception('Error inserting one or more rows') def get_latest_operation_by_prefix(services: Services, options: STSJobManagerOptions): """ Gets the latest transfer operation cooresponding to a prefix. Returns a key-value object where the key is a prefix and the value is a [TransferOperation](https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferOperations#resource-transferoperation). """ job_filter = json.dumps({"project_id": services.bigquery.project}) request = services.sts.transferOperations().list( name='transferOperations', filter=job_filter, pageSize=256) latest_operation_by_prefix: Dict[str, dict] = {} operation_to_prefix: Dict[str, str] = {} while request is not None: response = request.execute() if not response: break for operation in response['operations']: transfer_spec = operation['metadata']['transferSpec'] if 'objectConditions' not in transfer_spec: continue object_conditions = transfer_spec['objectConditions'] if 'includePrefixes' not in object_conditions: continue if 'gcsDataSource' not in operation['metadata']['transferSpec']: continue if 'gcsDataSink' not in operation['metadata']['transferSpec']: continue if options.source_bucket != operation['metadata']['transferSpec'][ 'gcsDataSource']['bucketName']: continue if options.destination_bucket != operation['metadata'][ 'transferSpec']['gcsDataSink']['bucketName']: continue for prefix in object_conditions['includePrefixes']: operation_to_set_for_prefix = None if prefix not in latest_operation_by_prefix: # The prefix does not have an operation, let's use this one operation_to_set_for_prefix = operation elif 'endTime' not in operation['metadata'] or \ 'endTime' not in latest_operation_by_prefix[prefix][ 'metadata']: # if end time is not available, use the start time if operation['metadata']['startTime'] > \ latest_operation_by_prefix[prefix]['metadata'][ 'startTime']: latest_operation_by_prefix[prefix] = operation elif operation['metadata']['endTime'] > \ latest_operation_by_prefix[prefix]['metadata'][ 'endTime']: # This operation is newer than the assigned operation operation_to_set_for_prefix = operation # Set the operation for the prefix if operation_to_set_for_prefix: # unreference existing operation to prefix, if exists operation_to_prefix.pop(operation['name'], None) latest_operation_by_prefix[prefix] = operation operation_to_prefix[operation['name']] = prefix request = services.sts.transferOperations().list_next( previous_request=request, previous_response=response) # If the latest transferOperation is from a deleted job, we should not # consider the operation for state management deleted_job_request = services.sts.transferJobs().list( filter=json.dumps({ "project_id": services.bigquery.project, "jobStatuses": ["DELETED"] }), pageSize=256) while deleted_job_request is not None: deleted_job_response = deleted_job_request.execute() if not deleted_job_response: break for transferJob in deleted_job_response['transferJobs']: if 'latestOperationName' not in transferJob: continue operation_to_remove = transferJob['latestOperationName'] prefix = operation_to_prefix.pop(operation_to_remove, None) if prefix: latest_operation_by_prefix.pop(prefix, None) deleted_job_request = services.sts.transferJobs().list_next( previous_request=deleted_job_request, previous_response=deleted_job_response) return latest_operation_by_prefix def manage_state(services: Services, options: STSJobManagerOptions): """ Gathers all prefix information from both STS and the database, then updates the corresponding rows where necessary. """ logger.info('Checking state...') # jobs from the database jobs = get_jobs_by_prefix(services, options) # transfer operations from STS latest_operation_by_prefix = get_latest_operation_by_prefix( services, options) history_rows: List[object] = [] job_status_to_update: Dict[str, List[str]] = { STATUS.DONE: [], STATUS.ERROR: [], STATUS.PAUSED: [], STATUS.RUNNING: [], STATUS.WAITING: [] } def append_history(job: Job, operation_results: object): history_rows.append({ 'prefix': job.prefix, 'status': job.status, 'job_name': job.job_name, 'operation_results': json.dumps(operation_results), 'timestamp': datetime.now() }) for prefix in jobs: if prefix in latest_operation_by_prefix: operation_status = \ latest_operation_by_prefix[prefix]['metadata']['status'] expected_status = jobs[prefix].status actual_status = sts_operation_status_to_table_status( operation_status) actual_job_name = latest_operation_by_prefix[prefix]['name'] if actual_status != expected_status: # Capture the history for running jobs logger.info( f'Status for prefix `{prefix}` has changed from \ `{expected_status}` to `{actual_status}`') jobs[prefix].status = actual_status job_status_to_update[actual_status].append(prefix) append_history( jobs[prefix], latest_operation_by_prefix[prefix]) elif actual_status == STATUS.RUNNING: # Capture the history for running jobs append_history( jobs[prefix], latest_operation_by_prefix[prefix]) if actual_job_name != jobs[prefix].job_name: set_job_name(prefix, actual_job_name, services, options) # sleep to avoid rate limiting # https://cloud.google.com/bigquery/quotas#standard_tables time.sleep(2) # Assign the latest `operation_results` jobs[prefix].operation_results = latest_operation_by_prefix[prefix] if history_rows: insert_history(history_rows, services, options) for status in job_status_to_update: if job_status_to_update[status]: set_prefixes_to_status( job_status_to_update[status], status, services, options) # sleep to avoid rate limiting # https://cloud.google.com/bigquery/quotas#standard_tables time.sleep(2) logger.info('...state is up to date.') return jobs def run_jobs(count: int, services: Services, options: STSJobManagerOptions): """ Pulls pending prefixes from the database and either create a new transfer operation or resume an existing one. The `manage_state` function will handle the updates in the job statuses; this keeps DML usage to a minimum """ table = get_table_identifier( services, options.bigquery_options, options.bigquery_options.table_name['job']) # API does not support table names for preparameterized queries # https://cloud.google.com/bigquery/docs/parameterized-queries query = f""" SELECT * FROM `{table}` WHERE status IN UNNEST(@statuses) LIMIT @count """ # nosec pending_statuses = [STATUS.WAITING, STATUS.PAUSED] tryable_statuses = [STATUS.WAITING, STATUS.PAUSED, STATUS.ERROR] statuses = pending_statuses if options.no_retry_on_job_error \ else tryable_statuses params = [ bigquery.ArrayQueryParameter("statuses", "STRING", statuses), bigquery.ScalarQueryParameter("count", "INT64", count), ] results = run_query(query, params, services, options) for row in results: job = Job(row) if job.status == STATUS.PAUSED: operation_request = services.sts.transferOperations().resume( name=job.job_name, body={}) operation_request.execute() logger.info(f'Resumed `{job.prefix}` (job name: {job.job_name}).') else: utc_now = datetime.utcnow() if job.status == STATUS.ERROR: logger.error( f'Retrying errored prefix `{job.prefix}`. \ Previous failed job: {job.job_name}') transfer_job_body = { 'description': f'Created via STS Job Manager - {job.prefix}', 'project_id': services.bigquery.project, 'transfer_spec': { 'object_conditions': { 'include_prefixes': [ job.prefix ] }, 'transfer_options': { 'overwrite_objects_already_existing_in_sink': options.overwrite_dest_objects }, 'gcs_data_source': { 'bucket_name': options.source_bucket }, 'gcs_data_sink': { 'bucket_name': options.destination_bucket } }, 'schedule': { "schedule_start_date": { "year": utc_now.year, "month": utc_now.month, "day": utc_now.day }, "schedule_end_date": { "year": utc_now.year, "month": utc_now.month, "day": utc_now.day } }, 'status': 'ENABLED' } request = services.sts.transferJobs().create( body=transfer_job_body) response = request.execute() logger.info( f'Created new transfer job for `{job.prefix}`: ({response}).') return True def determine_stalled_jobs(jobs: Dict[str, Job], last_jobs: Dict[str, Job]) \ -> List[Job]: stalled_jobs: List[Job] = [] for prefix in jobs: if prefix not in last_jobs: continue current_job = jobs[prefix] last_job = last_jobs[prefix] if current_job.status != STATUS.RUNNING or \ last_job.status != STATUS.RUNNING: continue if not current_job.operation_results or not last_job.operation_results: continue current_counters = \ current_job.operation_results['metadata']['counters'] last_counters = last_job.operation_results['metadata']['counters'] if current_counters and last_counters: has_changed = False for key in current_counters: if key not in last_counters or \ current_counters[key] != last_counters[key]: has_changed = True break if not has_changed: stalled_jobs.append(current_job) return stalled_jobs def manage_jobs(jobs: Dict[str, Job], last_jobs: Dict[str, Job], services: Services, options: STSJobManagerOptions): """ Determines the number of new operations to spin-up, then spins them up. """ def num_new_jobs_to_run(): pending_job_count = 0 current_running_jobs = 0 for prefix in jobs: if jobs[prefix].status == STATUS.RUNNING: current_running_jobs += 1 elif jobs[prefix].status == STATUS.WAITING or \ jobs[prefix].status == STATUS.PAUSED: pending_job_count += 1 elif not options.no_retry_on_job_error and \ jobs[prefix].status == STATUS.ERROR: pending_job_count += 1 if options.allow_new_jobs_when_stalled: stalled_count = len(determine_stalled_jobs(jobs, last_jobs)) current_running_jobs = max(0, current_running_jobs - stalled_count) max_number_jobs_available_to_run = \ options.max_concurrent_jobs - current_running_jobs double_current_job_count = current_running_jobs * 2 if not pending_job_count: logger.info('No jobs available to run') return 0 elif current_running_jobs > options.max_concurrent_jobs: logger.info(f'Will not create any new jobs - too many are running \ (current = {current_running_jobs}, \ max = {options.max_concurrent_jobs})') return 0 elif current_running_jobs == 0 and \ max_number_jobs_available_to_run > 0: logger.info( 'Will prepare initial job, as no other jobs are running') return 1 else: logger.info('Ramping up job count') return min(max_number_jobs_available_to_run, double_current_job_count) logger.info('Managing jobs...') count = num_new_jobs_to_run() if not count: logger.info('...no new jobs to run.') return logger.info(f'...spinning up to {count} new job(s)...') run_jobs(count, services, options) logger.info('...done running jobs.') def publish_heartbeat(jobs: Dict[str, Job], last_jobs: Dict[str, Job], services: Services, options: STSJobManagerOptions, monitoring_types=monitoring_v3.types): """ Publishes status heartbeats """ def publish_timeseries_heartbeat(name: str, value: int, services: Services, project_name: str, monitoring_types=monitoring_v3.types): logger.info(f'Preparing heartbeat for `{name}` (value: {value})...') series = monitoring_types.TimeSeries() series.metric.type = name point = series.points.add() point.value.int64_value = value point.interval.end_time.seconds = int(time.time()) services.monitoring.create_time_series(project_name, [series]) logger.info(f'...published heartbeat `{name}`.') p = options.stackdriver_project if options.stackdriver_project \ else services.bigquery.project monitoring_project_name = services.monitoring.project_path(p) logger.info(f'Preparing heartbeats for `{monitoring_project_name}`...') status_count: Dict[str, int] = {} stalled_count = 0 # Ensure known statuses are published, even if 0 for status in KNOWN_STATUSES: status_count[status] = 0 # Gather raw status counts for prefix in jobs: job = jobs[prefix] # status could be unknown if job.status not in status_count: status_count[job.status] = 0 status_count[job.status] += 1 for status in status_count: name = f'custom.googleapis.com/sts_job_manager/status/{status}' count = status_count[status] publish_timeseries_heartbeat( name, count, services, monitoring_project_name, monitoring_types) for job in determine_stalled_jobs(jobs, last_jobs): logger.warn(f'Job `{job.job_name}` appears to be stalled.') stalled_count += 1 # Publish stalled count stalled_metric = 'custom.googleapis.com/sts_job_manager/metrics/stalled' publish_timeseries_heartbeat( stalled_metric, stalled_count, services, monitoring_project_name, monitoring_types) logger.info('...done publishing heartbeats.') def interval(services: Services, options: STSJobManagerOptions): """ The main state and job running interval. This runs the main lifecycle of this application. """ interval_count = 0 last_state_check = 0.0 last_manage_jobs = 0.0 last_jobs: Dict[str, Job] = {} jobs: Dict[str, Job] = {} while True: logger.info(f'Running main interval #{interval_count}...') start = time.time() job_timeout = start - last_manage_jobs >= options.job_interval metrics_timeout = start - last_state_check >= options.metrics_interval if job_timeout or metrics_timeout: last_jobs = jobs jobs = manage_state(services, options) last_state_check = time.time() if job_timeout: manage_jobs(jobs, last_jobs, services, options) # Regather metrics jobs = manage_state(services, options) last_manage_jobs = time.time() if options.publish_heartbeat: try: publish_heartbeat(jobs, last_jobs, services, options) except Exception as e: logger.error('Failed to publish heartbeat:') logger.exception(e) delta = time.time() - start + options.sleep_timeout logger.info(f'...done running main interval #{interval_count}.\n') if delta > 0: time.sleep(delta) interval_count += 1 def main(options: STSJobManagerOptions): """ The main function. """ logger.info('Initializing STS Job Manager.') services = Services() interval(services, options) if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) options = STSJobManagerOptions() options.setup_arg_parser(parser) args = parser.parse_args() options.assign_from_parsed_args(args) main(options)