in cv.py [0:0]
def optimize(config_pth, module, basedate, iters, array_parallelism, resume):
cfg = load_config(config_pth)
ax_client = AxClient(enforce_sequential_optimization=False)
ax_client.create_experiment(
name="covid optimize",
parameters=cfg[module]["optimize"],
objective_name="mae",
choose_generation_strategy_kwargs={
"max_parallelism_override": int(array_parallelism / 5)
},
minimize=True,
)
region = cfg["region"]
cfg["this_module"] = module
now = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
user = cluster.USER
if resume is not None:
params_used = list(
json.load(open(os.path.join(resume, "best_params.json"))).keys()
)
for metrics_pth in glob(os.path.join(resume, "*", "metrics.csv")):
mets = pd.read_csv(metrics_pth, index_col="Measure")
mae = mets.loc["MAE"].mean()
cfg_ = load_config(os.path.join(os.path.dirname(metrics_pth), "bar.yml"))
params = {k: cfg_["train"][k] for k in params_used}
try:
_, idx = ax_client.attach_trial(params)
ax_client.complete_trial(idx, {"mae": mae})
except ValueError as e:
if "valid value for parameter" in str(e):
continue # this trial isn't valid for this grid, skip it...
raise e
basedir = f"{cluster.FS}/{user}/covid19/forecasts/{region}/{now}"
extra = cfg[module].get("resources", {})
executor = mk_executor(
f"cv_{region}",
basedir + "/%j",
{**extra, "array_parallelism": array_parallelism},
)
db_pth = executor.db_pth
def optimize_run(q, id, current_cfg):
executor = SlurmPoolExecutor(folder=basedir + "/%j", db_pth=db_pth)
executor.update_parameters(
job_name=f"cv_{region}",
partition=cluster.PARTITION,
gpus_per_node=extra.get("gpus", 0),
cpus_per_task=extra.get("cpus", 3),
mem=f'{cluster.MEM_GB(extra.get("memgb", 20))}GB',
array_parallelism=extra.get("array_parallelism", 100),
time=extra.get("timeout", 12 * 60),
)
job = executor.submit(
run_cv,
module=module,
basedir=basedir + "/%j",
cfg=current_cfg,
basedate=basedate,
executor=executor,
test_run=True,
)
result_pth = os.path.join(
os.path.dirname(str(job.paths.result_pickle)), "metrics.csv"
)
while not os.path.exists(os.path.join(result_pth)):
time.sleep(5)
metrics = pd.read_csv(result_pth, index_col="Measure")
q.put({"id": id, "parameters": parameters, "mae": metrics.loc["MAE"].mean()})
return {"mae": metrics.loc["MAE"].mean()}
q = queue.Queue()
waiting_for = 0
launched = False
for _ in range(iters):
while True:
try:
parameters, trial_idx = ax_client.get_next_trial()
break
except MaxParallelismReachedException:
if not launched:
executor.launch(
os.path.join(basedir, "workers"), workers=array_parallelism
)
launched = True
if waiting_for == 0 and q.qsize() == 0:
break
item = q.get()
ax_client.complete_trial(
trial_index=item["id"], raw_data={"mae": item["mae"]}
)
best_parameters, values = ax_client.get_best_parameters()
trials_df = ax_client.generation_strategy.trials_as_df
with open(os.path.join(basedir, "best_params.json"), "w") as fout:
print(json.dumps(best_parameters), file=fout)
with open(os.path.join(basedir, "ax_state.json"), "w") as fout:
print(json.dumps(ax_client.to_json_snapshot()), file=fout)
trials_df.to_csv(os.path.join(basedir, "trials.csv"), index=False)
current_cfg = copy.deepcopy(cfg)
current_cfg[module]["train"] = {**cfg[module]["train"], **parameters}
current_cfg[module]["train"] = {
k: v[0] if isinstance(v, list) else v
for k, v in current_cfg[module]["train"].items()
}
threading.Thread(target=optimize_run, args=(q, trial_idx, current_cfg)).start()
waiting_for += 1