in dyno/app/api/control.py [0:0]
def _update_status(job: str, config: dict) -> None:
"""
Helper function for updating the status of a job
This function can only be called directly and is
not accessible via HTTP.
Parameters
----------
job : str
The name of the job to update
config : dict
A configuration dictionary containing a valid
configuration for the job.
Returns
-------
None
Note
----
See implementation for a list of required keys in the
configuration dictionary.
Examples
--------
>>> config = {'duration': '90', 'delay': '91', \
'scenario': 'dyno', 'workers': '92',\
'error_weight': '93', 'port': '95', \
'app_latency_weight': '96','app_latency_label': 'my_label', \
'app_latency_lower_bound': 1,'app_latency_upper_bound': 1000}
>>> update_status('python', config)
"""
if job not in JOB_STATUS:
# TODO refactor to single source of truth
JOB_STATUS[job] = {
'duration': "31536000",
'delay': "0.600",
"scenario": "molotov_scenarios",
"workers": "3",
"error_weight": "0",
"app_latency_weight": "0",
"app_latency_label": "dyno_app_latency",
"app_latency_lower_bound": "1",
"app_latency_upper_bound": "1000"
}
status = JOB_STATUS[job]
status['running'] = True
status['duration'] = config['duration']
status['delay'] = config['delay']
status['scenario'] = config['scenario']
status['workers'] = config['workers']
status['error_weight'] = config['error_weight']
status['port'] = config['port']
status['app_latency_weight'] = config.get('app_latency_weight')
status['app_latency_label'] = config.get('app_latency_label')
status['app_latency_lower_bound'] = config.get('app_latency_lower_bound')
status['app_latency_upper_bound'] = config.get('app_latency_upper_bound')
status['name'] = job