in gridengine/src/gridengine/cli.py [0:0]
def shell(config: Dict) -> None:
"""
Provides read only interactive shell. type gehelp()
in the shell for more information
"""
ctx = DefaultContextHandler("[interactive-readonly]")
ge_env = environment.from_qconf(config)
ge_driver = autoscaler.new_driver(config, ge_env)
config = ge_driver.preprocess_config(config)
demand_calc = autoscaler.new_demand_calculator(config, ge_env, ge_driver, ctx)
queues = ge_env.queues
def gehelp() -> None:
print("config - dict representing autoscale configuration.")
print("dbconn - Read-only SQLite conn to node history")
print("demand_calc - DemandCalculator")
print("ge_driver - GEDriver object.")
print("jobs - List[Job] from ge_driver")
print("node_mgr - NodeManager")
print("logging - HPCLogging module")
print("queues - GridEngineQueue objects")
shell_locals = {
"config": config,
"ctx": ctx,
"ge_driver": ge_driver,
"demand_calc": demand_calc,
"node_mgr": demand_calc.node_mgr,
"jobs": ge_env.jobs,
"dbconn": demand_calc.node_history.conn,
"gehelp": gehelp,
"queues": queues,
"ge_env": ge_env,
}
banner = "\nCycleCloud GE Autoscale Shell"
interpreter = ReraiseAssertionInterpreter(locals=shell_locals)
try:
__import__("readline")
# some magic - create a completer that is bound to the locals in this interpreter and not
# the __main__ interpreter.
interpreter.push("import readline, rlcompleter")
interpreter.push('readline.parse_and_bind("tab: complete")')
interpreter.push("_completer = rlcompleter.Completer(locals())")
interpreter.push("def _complete_helper(text, state):")
interpreter.push(" ret = _completer.complete(text, state)")
interpreter.push(' ret = ret + ")" if ret[-1] == "(" else ret')
interpreter.push(" return ret")
interpreter.push("")
interpreter.push("readline.set_completer(_complete_helper)")
for item in interpreter.history_lines:
try:
if '"""' in item:
interpreter.push(
"readline.add_history('''%s''')" % item.rstrip("\n")
)
else:
interpreter.push(
'readline.add_history("""%s""")' % item.rstrip("\n")
)
except Exception:
pass
interpreter.push("from hpc.autoscale.job.job import Job\n")
interpreter.push("from hpc.autoscale import hpclogging as logging\n")
except ImportError:
banner += (
"\nWARNING: `readline` is not installed, so autocomplete will not work."
)
interpreter.interact(banner=banner)