dyno/app/api/control.py (194 lines of code) (raw):
# -*- coding: utf-8 -*-
import os
import subprocess
import signal
from pathlib import Path
from . import bp
import socketio
from flask import request
# TODO Pull this from config object instead
DEBUG = os.environ.get('DYNO_DEBUG')
""" Public HTTP methods """
@bp.route('/start', methods=['POST'])
def start_job() -> dict:
"""
Start a load-generation job
Exposed via HTTP at /api/start
Supported HTTP methods: GET
Note
----
Paramaters are received via JSON in a Flask request object. They
may not be passed directly to this function.
Parameters
----------
port : str
The port number to target
scenario : str
The scenario to launch. Defaults to `molotov_scenarios`
duration : str
The duration for the test to run in seconds. Default is 365 *days*.
delay : str
The average delay between requests. See the Molotov documation
for more information on this option.
https://molotov.readthedocs.io/en/stable/cli/?highlight=delay#command-line-options
workers : str
The number of workers to start. See the Molotov documentation
for more details on this option:
https://molotov.readthedocs.io/en/stable/cli/?highlight=workers#command-line-options
error_weight : str
The relative "weight" of errors. Higher number result in the load
generator choosing to hit pages known to produce errors a higher
percentage of the time.
This number is entirely arbitrary and is only relative to statically
configured weights in the scenario file itself.
app_latency_weight : str
In the case of the `dyno` scenario, an app_latency_weight
parameter can be passed which increases the rate at which a given
label is accessed.
Does NOT work with scenarios other than `dyno`!
app_latency_label : str
Used in conjunction with `app_latency_weightr` to specify a
label which should be hit at a higher or lower rate.
app_latency_lower_bound : int
The lower bound of latency which should be applied to requests which
hit the delayed endpoint
app_latency_upper_bound : int
The upper bound of latency which should be applied to requests which
hit the delayed endpoint
Examples
--------
Sample JSON payload to send to this endpoint:
> {"job":"opbeans-python","port":"8000"}
"""
r = request.get_json() or {}
job = r.get('job')
config = {
'port': r.get('port'),
'scenario': r.get('scenario', "molotov_scenarios"),
'duration': r.get('duration', "31536000"),
'delay': r.get('delay', "0.600"),
'workers': r.get('workers', "3"),
'error_weight': r.get('error_weight', "0"),
'app_latency_weight': r.get('app_latency_weight', "0"),
'app_latency_label': r.get('app_latency_label', 'dyno_latency'), # noqa
'app_latency_lower_bound': r.get('app_latency_lower_bound', 1), # noqa
'app_latency_upper_bound': r.get('app_latency_upper_bound', 1000), # noqa
}
job = job.replace('opbeans-', '')
if config['scenario']:
config['scenario'] = "scenarios/" + config['scenario'] + ".py"
_launch_job(job, config)
return {}
@bp.route('/list', methods=['GET'])
def get_list() -> dict:
"""
Return the current status of all configured
jobs
Exposed via HTTP at /api/list
Supported HTTP methods: GET
Returns
-------
dict
The current job status dictionary.
HTTP clients will receive the return as JSON.
Examples
--------
❯ curl -s http://localhost:8999/api/list|jq
{
"python": {
"delay": "0.600",
"duration": "31536000",
"error_weight": "0",
"app_latency_label": "dyno_delay",
"app_latency_weight": "2",
"app_latency_lower_bound": "1",
"app_latency_upper_bound": "1000",
"name": "python",
"port": "8000",
"running": false,
"scenario": "scenarios/molotov_scenarios.py",
"workers": "3"
}
}
"""
return JOB_STATUS
@bp.route('/update', methods=['POST'])
def update_job() -> dict:
"""
Updates a job with a new configuration.
We try to reconstruct the existing job by querying
the status dictionary and then we update as necessary.
Then we kill the job and start it again with the new
values.
Exposed via HTTP at /api/update
Supported HTTP methods: POST
Parameters
----------
job : str
The name of the job to modify. (Required)
workers : str
The number of workers the load generator should use. (Optional)
error_weight : str
The relative "weight" of errors. Higher number result in the load
generator choosing to hit pages known to produce errors a higher
percentage of the time.
This number is entirely arbitrary and is only relative to statically
configured weights in the scenario file itself. (Optional)
app_latency_weight : str
In the case of the `dyno` scenario, an `app_latency_weight`
parameter can be passed which increases the rate at which a given
label is accessed.
Does NOT work with scenarios other than `dyno`! (Optional)
app_latency_label : str
Used in conjunction with `app_latency_eight` to specify a label which
should be hit at a higher or lower rate, which is controlled by
the `app_latency_weight` parameter.
app_latency_lower_bound : int
The lower bound of latency which should be applied to requests which
hit the delayed endpoint
app_latency_upper_bound : int
The upper bound of latency which should be applied to requests which
hit the delayed endpoint
Returns
-------
An empty dictionary on success
Examples
--------
Sample JSON payload to send to this endpoint:
> {"job":"python","workers":2.1}
Note
----
Paramaters are received via JSON in a Flask request object. They
may not be passed directly to this function.
"""
r = request.get_json() or {}
job = r.get('job')
if job is None:
return "Must supply job", 400
if job not in JOB_STATUS:
# TODO refactor to single source of truth
JOB_STATUS[job] = {
'duration': "31536000",
'delay': "0.600",
"scenario": "molotov_scenarios",
"workers": r.get('workers', "3"),
"error_weight": r.get('error_weight', "0"),
"app_latency_weight": r.get('app_latency_weight', 0),
"app_latency_label": r.get("app_latency_label", "dyno_app_latency"), # noqa
"app_latency_lower_bound": r.get("app_latency_lower_bound", 1),
"app_latency_upper_bound": r.get("app_latency_upper_bound", 1000) # noqa
}
return {}
config = JOB_STATUS[job]
if 'workers' in r:
config['workers'] = r['workers']
if 'error_weight' in r:
config['error_weight'] = r['error_weight']
if 'app_latency_weight' in r:
config['app_latency_weight'] = r['app_latency_weight']
if 'app_latency_label' in r:
config['app_latency_label'] = r['app_latency_label']
if 'app_latency_lower_bound' in r:
config['app_latency_lower_bound'] = r['app_latency_lower_bound']
if 'app_latency_upper_bound' in r:
config['app_latency_upper_bound'] = r['app_latency_upper_bound']
_stop_job(job)
if DEBUG:
print('Relaunching job: ', config)
_launch_job(job, config)
_update_status(job, config)
return {}
@bp.route('/stop', methods=['GET'])
def stop_job() -> dict:
"""
Stop a load-generation job
Exposed via HTTP at /api/stop
Supported HTTP methods: POST
Note
----
Paramaters are received query arguments in a Flask request object. They
may not be passed directly to this function.
Parameters
----------
job : str
The job to stop
Examples
--------
> curl http://localhost:8999/api/stop?job=opbeans-python
"""
job = request.args.get('job')
job = job.replace('opbeans-', '')
_stop_job(job)
return {}
@bp.route('/splays', methods=['GET'])
def get_splays() -> dict:
"""
Fetch a list of splays
Exposed via HTTP at /api/splays
Supported HTTP methods: GET
Returns
-------
dict
A dictionary containing a list of possible splay. A splay
is a property across which delayed requests are distributed.
Examples
--------
❯ curl -s http://localhost:8999/api/splays|jq
{
"splays": [
"User-agent: Safari",
"IP addresses: 10.0.0.0/8"
]
}
"""
# Fixed for the time being
ret = {'splays': ['User-Agent: Safari']}
return ret
@bp.route('/scenarios', methods=['GET'])
def get_scenarios() -> dict:
"""
Fetch a list of scenarios.
Exposed via HTTP at /api/scenarios
Supported HTTP methods: GET
Returns
-------
dict
A dictionary containing a list of scenarios under the `scenarios` key.
HTTP clients will receive the return as JSON.
Note
----
To add a new scenario to the application, it must be added to the
scenarios/ folder before it appears in this list.
Examples
--------
❯ curl -s http://localhost:8999/api/scenarios|jq
{
"scenarios": [
"dyno",
"molotov_scenarios",
"high_error_rates"
]
}
"""
cur_dir = os.path.dirname(os.path.realpath(__file__))
scenario_dir = os.path.join(cur_dir, "../../../scenarios")
files = os.listdir(scenario_dir)
ret = {'scenarios': []}
for file in files:
if file.startswith('__'):
continue
base_name = Path(file).stem
ret['scenarios'].append(base_name)
return ret
""" Private helper functions """
def _construct_toxi_env(
job: str,
port: str,
scenario: str,
error_weight: int,
app_latency_weight=None,
app_latency_label=None,
app_latency_lower_bound=None,
app_latency_upper_bound=None,
) -> dict:
"""
Construct a dictionary representing an Opbeans environment
which is fronted by a Toxiproxy instance.
Note
----
The `label_weight` and `label_name` parameters are currently used
only in the context of the `dyno` scenario. Otherwise, they are
ignored.
Parameters
----------
job : str
The name of the job. Should be passed without including the `opbeans-`
prefix.
port : str
The port for the environment. Can also be passed as a int type.
scenario : str
Thie scenario for use with this instance. Should be passed as simply
the name and NOT as a filename.
error_weight : int
The relative "weight" of errors. Higher number result in the load
generator choosing to hit pages known to produce errors a higher
percentage of the time.
This number is entirely arbitrary and is only relative to statically
configured weights in the scenario file itself.
app_latency_weight : int
In the case of the `dyno` scenario, the app_latency_weight parameter
can be passed which increases the rate at which an endpoint which
artifically introduces latency is hit.
app_latency_label : str
Used in conjunction with `app_latency_weight` to specify a label which
should be applied to latent requests.
app_latency_lower_bound : int
Used in conjunction with `app_latency_weight`, this parameter
specifies the lower bound for latency. Requests will never be
less latent than this value.
app_latency_upper_bound : int
Used in conjunction with `app_latency_weight`, this parameter
specifies the upper bound for latency. Requests will never by (much)
more latent than this value.
app_latency_user_agent : str
Used in conjunction with `app_latency_weight`, this parameter
specifies a user agent for the latent requests.
Returns
-------
dict
Dictionary containing the environment keys and values
Examples
--------
Sample call showing just the required parameters
>>> _construct_toxi_env('python', 9999, 'my_great-scenario', 99)
{
'OPBEANS_BASE_URL': 'http://toxi:9999',
'OPBEANS_NAME': 'opbeans-python',
'ERROR_WEIGHT': '99'
}
"""
toxi_env = os.environ.copy()
toxi_env['OPBEANS_BASE_URL'] = "http://toxi:{}".format(port)
toxi_env['OPBEANS_NAME'] = "opbeans-" + job
toxi_env['ERROR_WEIGHT'] = str(error_weight)
if app_latency_weight:
toxi_env['APP_LATENCY_WEIGHT'] = str(app_latency_weight)
if app_latency_label:
toxi_env['APP_LATENCY_LABEL'] = app_latency_label
if app_latency_lower_bound:
toxi_env['APP_LATENCY_LOWER_BOUND'] = str(app_latency_lower_bound)
if app_latency_upper_bound:
toxi_env['APP_LATENCY_UPPER_BOUND'] = str(app_latency_upper_bound)
return toxi_env
def _update_status(job: str, config: dict) -> None:
"""
Helper function for updating the status of a job
This function can only be called directly and is
not accessible via HTTP.
Parameters
----------
job : str
The name of the job to update
config : dict
A configuration dictionary containing a valid
configuration for the job.
Returns
-------
None
Note
----
See implementation for a list of required keys in the
configuration dictionary.
Examples
--------
>>> config = {'duration': '90', 'delay': '91', \
'scenario': 'dyno', 'workers': '92',\
'error_weight': '93', 'port': '95', \
'app_latency_weight': '96','app_latency_label': 'my_label', \
'app_latency_lower_bound': 1,'app_latency_upper_bound': 1000}
>>> update_status('python', config)
"""
if job not in JOB_STATUS:
# TODO refactor to single source of truth
JOB_STATUS[job] = {
'duration': "31536000",
'delay': "0.600",
"scenario": "molotov_scenarios",
"workers": "3",
"error_weight": "0",
"app_latency_weight": "0",
"app_latency_label": "dyno_app_latency",
"app_latency_lower_bound": "1",
"app_latency_upper_bound": "1000"
}
status = JOB_STATUS[job]
status['running'] = True
status['duration'] = config['duration']
status['delay'] = config['delay']
status['scenario'] = config['scenario']
status['workers'] = config['workers']
status['error_weight'] = config['error_weight']
status['port'] = config['port']
status['app_latency_weight'] = config.get('app_latency_weight')
status['app_latency_label'] = config.get('app_latency_label')
status['app_latency_lower_bound'] = config.get('app_latency_lower_bound')
status['app_latency_upper_bound'] = config.get('app_latency_upper_bound')
status['name'] = job
def _launch_job(job: str, config: dict) -> None:
"""
Spawn a new load-generation job
This function can only be called directly and is
not accessible via HTTP.
Parameters
----------
job : str
config : dict
A configuration dictionary containing a valid configuration
for the job.
Returns
-------
None
Examples
--------
>>> config = {'duration': '90', 'delay': '91', \
'scenario': 'dyno', 'workers': '92',\
'error_weight': '93', 'port': '95'}
>>> update_status('python', config)
"""
if DEBUG:
print(
'Job launch received: ', config
)
if DEBUG:
cmd = ['sleep', '10']
else:
cmd = [
"/app/venv/bin/python",
"/app/venv/bin/molotov",
"-v",
"--duration",
str(config['duration']),
"--delay",
str(config['delay']),
"--uvloop",
"--workers",
str(int(config['workers'])),
"--statsd",
"--statsd-address",
"udp://stats-d:8125",
config['scenario']
]
s = socketio.Client()
s.emit('service_state', {'data': {job: 'start'}})
if DEBUG:
print('Launching with: ', cmd)
toxi_env = _construct_toxi_env(
job,
config['port'],
config['scenario'],
config['error_weight'],
# TODO These are optional so that we don't
# break backward compatability with older clients.
# Eventually the lookup fallbacks can be removed.
config.get('app_latency_weight'),
config.get('app_latency_label'),
config.get('app_latency_lower_bound'),
config.get('app_latency_upper_bound'),
)
_update_status(job, config)
# “I may not have gone where I intended to go, but I think I have ended up
# where I needed to be.”
# ― Douglas Adams, The Long Dark Tea-Time of the Soul
p = subprocess.Popen(cmd, cwd="../", preexec_fn=os.setsid, env=toxi_env)
JOB_MANAGER[job] = p
def _stop_job(job: str) -> None:
"""
Helper function for stopping a job
Note
----
This function can only be called directly and is
not accessible via HTTP.
Parameters
----------
job : str
The job to stop
Returns
-------
None
Examples
--------
>>> _stop_job('opbeans-python')
"""
s = socketio.Client()
s.emit('service_state', {'data': {job: 'stop'}})
if job in JOB_MANAGER:
p = JOB_MANAGER[job]
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
JOB_MANAGER[job] = None
if job in JOB_STATUS:
j = JOB_STATUS[job]
j['running'] = False
# Simple structures for tracking status which is
# Thread-Safe-Enough (tm) for our needs because we
# limit ourselves to a single webserver proc and this
# is a private server
JOB_STATUS = {}
JOB_MANAGER = {}