tools/deploy/app.py (705 lines of code) (raw):

from __future__ import absolute_import import base64 import os import os.path from enum import Enum from time import sleep, time from aurora.api.constants import AURORA_EXECUTOR_NAME, ACTIVE_STATES from aurora.api.ttypes import ( JobKey, TaskConfig, JobConfiguration, ExecutorConfig, Container, DockerContainer, DockerParameter, Identity, ResponseCode, ScheduleStatus, TaskQuery, JobUpdateRequest, JobUpdateSettings, JobUpdateQuery, JobUpdateStatus, Range, Constraint, TaskConstraint, LimitConstraint, Resource, Metadata, ) from aurora.schema.thermos.schema_base import ( Task as ThermosTask, Process as ThermosProcess, Resources, Logger as ThermosLogger, LoggerMode, RotatePolicy, ) from aurora.schema.aurora.base import ( Announcer, MesosJob as ThermosJob, HealthCheckConfig, HealthCheckerConfig, HttpHealthChecker, DisableLifecycle, ) KB = 1024 MB = 1024 * KB GB = 1024 * MB AURORA_ROLE = "peloton" AURORA_ENVIRONMENT = "production" AURORA_USER = "peloton" MAX_WAIT_TIME_SECONDS = 600 WAIT_INTERVAL_SECONDS = 10 def combine_messages(response): """ Combines the message found in the details of a response. :param response: response to extract messages from. :return: Messages from the details in the response, or an empty string if there were no messages. """ return ", ".join( [d.message or "Unknown error" for d in (response.details or [])] ) class Role(Enum): """ The role of a Peloton app instance """ UNKNOWN = 1 LEADER = 2 FOLLOWER = 3 ALL = 4 class Instance(object): """ Representation of an instance of a Peloton application """ def __init__(self, task, role): assert task.assignedTask self.instance_id = task.assignedTask.instanceId self.host = task.assignedTask.slaveHost self.state = task.status # TODO: query Peloton app endpoint to determine leader/follower role self.role = role class App(object): """ Representation of a Peloton application with multiple instances """ def __init__(self, **kwargs): """ Initializes a Peloton application """ # Default attributes self.enable_debug_logging = False self.enable_secrets = False self.cpu_limit = 4.0 self.mem_limit = 16 * GB self.disk_limit = 16 * GB self.scarce_resource_types = None self.slack_resource_types = None self.enable_revocable_resources = None self.bin_packing = None self.qos_advisor_service_address = None self.stream_only_mode = True self.task_preemption_period = "60s" self.enable_sla_tracking = False self.enable_preemption = False self.respool_path = None self.gpu_respool_path = None self.auth_type = "NOOP" self.auth_config_file = "" self.enable_inplace_update = False self.use_host_pool = False self.http_port = None self.grpc_port = None for k, v in kwargs.iteritems(): setattr(self, k, v) self.client = self.cluster.client self.prod_yaml_location = self.cluster.cfg_file if self.num_instances < 1: raise Exception("App %s has no instances" % self.name) self.job_key = JobKey( role=AURORA_ROLE, environment=AURORA_ENVIRONMENT, name=self.name ) # Generate the new job config for this app self.desired_job_config = self.get_desired_job_config() # Save current job config so that we can rollback to later self.current_job_config = self.get_current_job_config() def get_docker_params(self): """ Returns the docker params for a given Peloton application """ mesos_zk_path = "zk://%s/%s" % ( ",".join(self.cluster.zookeeper), self.cluster.mesos_zk_path, ) peloton_zk_endpoints = "\n".join(self.cluster.zookeeper) # add common variables env_vars = { "ENVIRONMENT": "production", "CONFIG_DIR": "./config", "APP": self.name, # TODO: fix Peloton code to only take self.cluster.mesos_zk_path "MESOS_ZK_PATH": mesos_zk_path, "ENABLE_DEBUG_LOGGING": self.enable_debug_logging, "ELECTION_ZK_SERVERS": peloton_zk_endpoints, "USE_CASSANDRA": self.cluster.cassandra_contact_points is not None, "CASSANDRA_HOSTS": "\n".join( self.cluster.cassandra_contact_points ), "CASSANDRA_STORE": self.cluster.cassandra_keyspace, "CASSANDRA_PORT": self.cluster.cassandra_port, "CLUSTER": self.cluster.name, "DATACENTER": getattr(self.cluster, "datacenter", ""), "MESOS_AGENT_WORK_DIR": self.cluster.mesos_agent_work_dir, "AUTO_MIGRATE": self.cluster.auto_migrate, "ENABLE_SENTRY_LOGGING": self.cluster.enable_sentry_logging, "SECRET_CONFIG_DIR": getattr( self.cluster, "secret_config_dir", "" ), "MESOS_SECRET_FILE": getattr( self.cluster, "mesos_secret_file", "" ), "PELOTON_SECRET_FILE": getattr( self.cluster, "peloton_secret_file", "" ), "ENABLE_SECRETS": self.enable_secrets, "AUTH_TYPE": self.auth_type, "AUTH_CONFIG_FILE": self.auth_config_file, } self.add_app_specific_vars(env_vars) name = self.name if name.startswith("placement_"): name = "placement" # base64 encode the prod config and add it to the PRODUCTION_CONFIG # environment variable inside the container prod_config_path = self.get_app_path().format(name) with open(prod_config_path, "rb") as config_file: env_vars["PRODUCTION_CONFIG"] = base64.b64encode( config_file.read() ) params = [ DockerParameter(name="env", value="%s=%s" % (key, val)) for key, val in env_vars.iteritems() ] volumes = [("/var/log/peloton", "/var/log/peloton", "rw")] # Mount langley secrets if secret dir is specified. if getattr(self.cluster, "secret_config_dir", ""): volumes.append( ( self.cluster.secret_config_dir, self.cluster.secret_config_dir, "ro", ) ) params.extend( DockerParameter(name="volume", value="%s:%s:%s" % (src, dst, mode)) for src, dst, mode in volumes ) return params def add_app_specific_vars(self, env_vars): """ Adds env variables specific to the application """ if self.name == "placement_stateful": env_vars["TASK_TYPE"] = "STATEFUL" env_vars["APP"] = "placement" env_vars["APP_TYPE"] = "placement_stateful" if self.cluster.use_host_pool: env_vars["USE_HOST_POOL"] = self.cluster.use_host_pool if self.name == "placement_stateless": env_vars["TASK_TYPE"] = "STATELESS" env_vars["APP"] = "placement" env_vars["APP_TYPE"] = "placement_stateless" if self.cluster.use_host_pool: env_vars["USE_HOST_POOL"] = self.cluster.use_host_pool if self.name == "placement": if getattr(self, "task_dequeue_limit", ""): env_vars["TASK_DEQUEUE_LIMIT"] = self.task_dequeue_limit if getattr(self, "task_dequeue_period", ""): env_vars["TASK_DEQUEUE_PERIOD"] = self.task_dequeue_period if self.cluster.use_host_pool: env_vars["USE_HOST_POOL"] = self.cluster.use_host_pool if self.name == "jobmgr": env_vars["JOB_TYPE"] = getattr(self, "job_type", "BATCH") env_vars["JOB_RUNTIME_CALCULATION_VIA_CACHE"] = getattr( self, "job_runtime_calculation_via_cache", False ) env_vars["TASK_KILL_RATE_LIMIT"] = getattr( self, "task_kill_rate_limit", 0.0 ) env_vars["TASK_KILL_BURST_LIMIT"] = getattr( self, "task_kill_burst_limit", 0 ) env_vars["TASK_LAUNCH_TIMEOUT"] = getattr( self, "task_launch_timeout", "0" ) env_vars["TASK_START_TIMEOUT"] = getattr( self, "task_start_timeout", "0" ) env_vars["EXECUTOR_SHUTDOWN_RATE_LIMIT"] = getattr( self, "executor_shutdown_rate_limit", 0.0 ) env_vars["EXECUTOR_SHUTDOWN_BURST_LIMIT"] = getattr( self, "executor_shutdown_burst_limit", 0 ) if self.name == "archiver": env_vars["ENABLE_ARCHIVER"] = self.enable_archiver env_vars["STREAM_ONLY_MODE"] = self.stream_only_mode env_vars["POD_EVENTS_CLEANUP"] = self.pod_events_cleanup env_vars["ARCHIVE_AGE"] = self.archive_age env_vars["ARCHIVE_INTERVAL"] = self.archive_interval env_vars["ARCHIVE_STEP_SIZE"] = self.archive_step_size env_vars["KAFKA_TOPIC"] = self.kafka_topic if self.name == "resmgr": env_vars["TASK_PREEMPTION_PERIOD"] = getattr( self, "task_preemption_period", "60s" ) env_vars["ENABLE_SLA_TRACKING"] = getattr( self, "enable_sla_tracking", False ) env_vars["ENABLE_PREEMPTION"] = getattr( self, "enable_preemption", False ) if self.name == "hostmgr": if self.scarce_resource_types: env_vars["SCARCE_RESOURCE_TYPES"] = ",".join( self.scarce_resource_types ) if self.slack_resource_types: env_vars["SLACK_RESOURCE_TYPES"] = ",".join( self.slack_resource_types ) if self.enable_revocable_resources: env_vars[ "ENABLE_REVOCABLE_RESOURCES" ] = self.enable_revocable_resources if self.bin_packing: env_vars["BIN_PACKING"] = self.bin_packing if self.qos_advisor_service_address: env_vars["QOS_ADVISOR_SERVICE_ADDRESS"] \ = self.qos_advisor_service_address if self.cluster.use_host_pool: env_vars["ENABLE_HOST_POOL"] = self.cluster.use_host_pool if self.name == "aurorabridge": if self.respool_path: env_vars["RESPOOL_PATH"] = self.respool_path if self.gpu_respool_path: env_vars["GPU_RESPOOL_PATH"] = self.gpu_respool_path if self.enable_inplace_update: env_vars["ENABLE_INPLACE_UPDATE"] = "true" def get_app_path(self): """ Returns the formatted path for app config """ dirname = os.path.dirname(self.prod_yaml_location) path = os.path.join( dirname, "../..", "config", "{}", "production.yaml" ) return path def get_docker_image(self): """ Returns the docker image path for a Peloton app """ return "%s/%s:%s" % ( self.cluster.docker_registry, self.cluster.docker_repository, self.cluster.version, ) def get_executor_config(self): """ Returns the Thermos executor config for a Peloton app """ host_logdir = "/var/log/peloton/%s" % self.name sandbox_logdir = "$MESOS_DIRECTORY/sandbox/.logs/%s/0" % self.name cmdline = " && ".join( [ "rm -rf %s" % host_logdir, "ln -s %s %s" % (sandbox_logdir, host_logdir), "/bin/entrypoint.sh", ] ) # Override executor command for api server to inject # ports environment variable if self.name == "apiserver" and \ self.http_port is None and \ self.grpc_port is None: cmdline = " && ".join( [ "rm -rf %s" % host_logdir, "ln -s %s %s" % (sandbox_logdir, host_logdir), "exec env HTTP_PORT={{thermos.ports[http]}} " "GRPC_PORT={{thermos.ports[grpc]}} " "/bin/entrypoint.sh", ] ) entrypoint_process = ThermosProcess( name=self.name, cmdline=cmdline, logger=ThermosLogger( mode=LoggerMode("rotate"), rotate=RotatePolicy(log_size=1 * GB, backups=10), ), ) thermos_task = ThermosTask( name=self.name, processes=[entrypoint_process], resources=Resources( cpu=self.cpu_limit, ram=self.mem_limit, disk=self.disk_limit ), ) health_check_config = HealthCheckConfig( health_checker=HealthCheckerConfig( http=HttpHealthChecker( endpoint="/health", expected_response="OK", expected_response_code=200, ) ), initial_interval_secs=15, interval_secs=10, max_consecutive_failures=4, timeout_secs=1, ) announce = Announcer() if self.http_port is not None: announce = Announcer(portmap={"health": self.http_port}) elif self.name == "apiserver": # Use assigned http port for health check if it is api server # and static http port is not configured announce = Announcer(portmap={"health": "{{thermos.ports[http]}}"}) thermos_job = ThermosJob( name=self.name, role=AURORA_ROLE, cluster=self.cluster.name, environment=AURORA_ENVIRONMENT, task=thermos_task, production=False, service=True, health_check_config=health_check_config, announce=announce, ) executor_config = ExecutorConfig( name=AURORA_EXECUTOR_NAME, data=thermos_job.json_dumps() ) return executor_config def get_desired_job_config(self): """ Return the Aurora job configuration defined in Thrift API so that we can create a job via Aurora API. """ # Add docker container container = Container( mesos=None, docker=DockerContainer( image=self.get_docker_image(), parameters=self.get_docker_params(), ), ) host_limit = Constraint( name=self.cluster.constraint, constraint=TaskConstraint(limit=LimitConstraint(limit=1)), ) # Set task metadata if presents in config if hasattr(self, 'metadata'): taskMetaData = set( [Metadata(key=k, value=v) for k, v in self.metadata.items()]) else: taskMetaData = set() # Set task resources resources = set( [ Resource(numCpus=self.cpu_limit), Resource(ramMb=self.mem_limit / MB), Resource(diskMb=self.disk_limit / MB), ] ) if self.name == "apiserver": if self.http_port is None: resources.add(Resource(namedPort="http")) if self.grpc_port is None: resources.add(Resource(namedPort="grpc")) task_config = TaskConfig( job=self.job_key, owner=Identity(user=AURORA_USER), isService=True, numCpus=self.cpu_limit, ramMb=self.mem_limit / MB, diskMb=self.disk_limit / MB, priority=0, maxTaskFailures=0, production=False, tier="preemptible", resources=resources, contactEmail="peloton-oncall-group@uber.com", executorConfig=self.get_executor_config(), container=container, constraints=set([host_limit]), requestedPorts=set(), mesosFetcherUris=set(), taskLinks={}, metadata=taskMetaData, ) job_config = JobConfiguration( key=self.job_key, owner=Identity(user=AURORA_USER), taskConfig=task_config, instanceCount=self.num_instances, ) return job_config def get_current_job_config(self): """ Return the current job config by querying the Aurora API """ resp = self.client.getJobSummary(AURORA_ROLE) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) job_config = None for s in resp.result.jobSummaryResult.summaries: if s.job.key == self.job_key: job_config = s.job break if job_config: instances = self.get_instances() job_config.instanceCount = len(instances.get(Role.ALL, [])) return job_config def get_instances(self): """ Returns all instances grouped by role if exist by querying Aurora API """ # Setup task query for task status of the Aurora job task_query = TaskQuery( role=AURORA_ROLE, environment=AURORA_ENVIRONMENT, jobName=self.name, statuses=ACTIVE_STATES, ) resp = self.client.getTasksWithoutConfigs(task_query) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) instances = {} leader = True for t in resp.result.scheduleStatusResult.tasks: if t.status not in ACTIVE_STATES: # Ignore tasks that are not in active states continue # Temporarily hack to set leader/follower roles # TODO: query Peloton app endpoint to find role role = Role.LEADER if leader else Role.FOLLOWER if leader: leader = False inst = Instance(t, role) instances.setdefault(inst.role, []).append(inst) instances.setdefault(Role.ALL, []).append(inst) return instances def wait_for_running(self, role): """ Wait for the app instances of a given role running """ num_instances = { Role.LEADER: 1, Role.FOLLOWER: self.num_instances - 1, Role.ALL: self.num_instances, }[role] start_time = time() while time() < start_time + MAX_WAIT_TIME_SECONDS: instances = self.get_instances().get(role, []) all_running = True for i in instances: if i.state != ScheduleStatus.RUNNING: all_running = False break print( "Wait for %s %s instances running: %d / %d" % (self.name, role.name, all_running, len(instances)) ) if all_running and len(instances) == num_instances: return True sleep(WAIT_INTERVAL_SECONDS) return False def update_instances(self, instances, job_config): """ Update the task config of the given app instances """ instance_ids = [i.instance_id for i in instances] req = JobUpdateRequest( taskConfig=job_config.taskConfig, instanceCount=self.num_instances, settings=JobUpdateSettings( updateGroupSize=1, maxPerInstanceFailures=3 ), ) if instance_ids: req.settings.updateOnlyTheseInstances = set( Range(i, i) for i in instance_ids ) resp = self.client.startJobUpdate( req, "Update %s instances for %s" % (len(instances), self.name) ) if resp.responseCode == ResponseCode.INVALID_REQUEST: if resp.result is None: raise Exception(combine_messages(resp)) update_key = resp.result.startJobUpdateResult.key update_summary = resp.result.startJobUpdateResult.updateSummary status = update_summary.state.status if status == JobUpdateStatus.ROLLING_FORWARD: # Abort the current update print( "Aborting the update for %s (id=%s)" % (self.name, update_key.id) ) self.client.abortJobUpdate( update_key, "Abort by a new deploy session" ) self.wait_for_update_done(update_key) # Restart the job update resp = self.client.startJobUpdate( req, "Update %s instances for %s" % (len(instances), self.name), ) else: raise Exception( "Invalid Request for job update (status=%s)" % (status, JobUpdateStatus._VALUES_TO_NAMES[status]) ) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) if resp.result is None: # No change for the job update print(resp.details[0].message) return True update_key = resp.result.startJobUpdateResult.key return self.wait_for_update_done(update_key, instance_ids) def wait_for_update_done(self, update_key, instance_ids=[]): """ Wait for the job update to finish """ query = JobUpdateQuery( role=AURORA_ROLE, key=update_key, jobKey=self.job_key ) start_time = time() while time() < start_time + MAX_WAIT_TIME_SECONDS: resp = self.client.getJobUpdateSummaries(query) if resp.responseCode != ResponseCode.OK: print(combine_messages(resp)) sleep(WAIT_INTERVAL_SECONDS) continue result = resp.result.getJobUpdateSummariesResult if len(result.updateSummaries) != 1: raise Exception( "Got multiple update summaries: %s" % str(result.updateSummaries) ) if result.updateSummaries[0].key != update_key: raise Exception( "Mismatch update key, expect %s, received %s" % (update_key, result.updateSummaries[0].key) ) status = result.updateSummaries[0].state.status print( "Updating %s instances %s (status=%s)" % ( self.name, instance_ids, JobUpdateStatus._VALUES_TO_NAMES[status], ) ) if status == JobUpdateStatus.ROLLED_FORWARD: return True elif status in [ JobUpdateStatus.ROLLED_BACK, JobUpdateStatus.ABORTED, JobUpdateStatus.ERROR, JobUpdateStatus.FAILED, ]: return False else: # Wait for 5 seconds sleep(WAIT_INTERVAL_SECONDS) return False def update_or_create_job(self, callback): """ Update the current job for a Peloton app. Create a new job if the job does not exist yet. """ # TODO: find the leader/follower role of each app instance if self.current_job_config is None: # Create the new Job in Aurora and check the response code print( "Creating new job for %s with %s instances" % (self.name, self.num_instances) ) resp = self.client.createJob(self.desired_job_config) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) # Wait for all instances are running retval = self.wait_for_running(Role.ALL) if retval: callback(self) return retval # Get current leader and follower instances cur_instances = self.get_instances() # First updade all existing instances, followers first then leader for role in [Role.FOLLOWER, Role.LEADER]: instances = cur_instances.get(role, []) if role == Role.LEADER and len(instances) > 1: raise Exception( "Found %d leaders for %s" % (len(instances), self.name) ) if len(instances) == 0: print("No %s %s instances to update" % (self.name, role.name)) continue print( "Start updating %d %s %s instances" % (len(instances), self.name, role.name) ) retval = self.update_instances(instances, self.desired_job_config) print( "Finish updating %d %s %s instances -- %s" % ( len(instances), self.name, role.name, "SUCCEED" if retval else "FAILED", ) ) if not retval or not callback(self): # Rollback the update by the caller return False # Then add any missing instances if needed cur_total = len(cur_instances.get(role.ALL, [])) new_total = self.num_instances > cur_total if new_total > 0: print("Start adding %d new %s instances" % (new_total, self.name)) retval = self.update_instances([], self.desired_job_config) print( "Finish adding %d new %s instances -- %s" % (new_total, self.name, "SUCCEED" if retval else "FAILED") ) if not retval or not callback(self): # Rollback the update by the caller return False return True def rollback_job(self): """ Rollback the job config of a Peloton app in case of failures """ if self.current_job_config is None: # Nothing to do if the job doesn't exist before return self.update_instances([], self.current_job_config) class CronApp(App): """ Representation of a CRON job """ def __init__(self, **kwargs): """ Initializes Cron job """ # Default attributes self.enable_debug_logging = False self.enable_secrets = False self.working_dir = None self.cmdline = None self.cron_schedule = "\*/5 * * * *" self.num_instances = 1 super(CronApp, self).__init__(**kwargs) self.cpu_limit = 1.0 self.mem_limit = 1 * GB self.disk_limit = 16 * GB def get_executor_config(self): """ Returns the Thermos executor config for a Peloton app """ if self.working_dir is not None: cmd = " && ".join(["cd " + self.working_dir, self.cmdline]) else: cmd = self.cmdline entrypoint_process = ThermosProcess(name=self.name, cmdline=cmd) thermos_task = ThermosTask( name=self.name, processes=[entrypoint_process], resources=Resources( cpu=self.cpu_limit, ram=self.mem_limit, disk=self.disk_limit ), ) thermos_job = ThermosJob( name=self.name, role=AURORA_ROLE, cluster=self.cluster.name, environment=AURORA_ENVIRONMENT, task=thermos_task, production=False, lifecycle=DisableLifecycle, cron_schedule=self.cron_schedule[1:], ) executor_config = ExecutorConfig( name=AURORA_EXECUTOR_NAME, data=thermos_job.json_dumps() ) return executor_config def get_desired_job_config(self): """ Return the Aurora job configuration defined in Thrift API so that we can create a job via Aurora API. """ # Add docker container container = Container( mesos=None, docker=DockerContainer( image=self.get_docker_image(), parameters=self.get_docker_params(), ), ) host_limit = Constraint( name=self.cluster.constraint, constraint=TaskConstraint(limit=LimitConstraint(limit=1)), ) task_config = TaskConfig( job=self.job_key, owner=Identity(user=AURORA_USER), isService=False, numCpus=self.cpu_limit, ramMb=self.mem_limit / MB, diskMb=self.disk_limit / MB, priority=0, maxTaskFailures=0, production=False, tier="preemptible", resources=set( [ Resource(numCpus=self.cpu_limit), Resource(ramMb=self.mem_limit / MB), Resource(diskMb=self.disk_limit / MB), ] ), contactEmail="peloton-oncall-group@uber.com", executorConfig=self.get_executor_config(), container=container, constraints=set([host_limit]), requestedPorts=set(), mesosFetcherUris=set(), taskLinks={}, metadata=set(), ) job_config = JobConfiguration( key=self.job_key, owner=Identity(user=AURORA_USER), taskConfig=task_config, instanceCount=self.num_instances, cronSchedule=self.cron_schedule[1:], ) return job_config def get_current_job_config(self): """ Return the current job config by querying the Aurora API """ resp = self.client.getJobSummary(AURORA_ROLE) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) job_config = None for s in resp.result.jobSummaryResult.summaries: if s.job.key == self.job_key: job_config = s.job break return job_config def update_or_create_job(self, callback): """ Update the current cron-job for a Peloton app. Create a new cron-job if the job does not exist yet. """ resp = self.client.scheduleCronJob(self.desired_job_config) if resp.responseCode != ResponseCode.OK: raise Exception(combine_messages(resp)) return True def rollback_job(self): """ Rollback the job config of a cron-job in case of failures """ if self.current_job_config is None: # Nothing to do if the job doesn't exist before return resp = self.client.scheduleCronJob(self.current_job_config) if resp.responseCode != ResponseCode.OK: print("Failed to rollback watchdog configuration (%s)" % (resp))