community/front-end/ofe/website/ghpcfe/cluster_manager/workbenchinfo.py (283 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workbench configuration and provisioning"""
import logging
import json
import shutil
import subprocess
from retry import retry
from . import utils
logger = logging.getLogger(__name__)
class WorkbenchInfo:
"""Workbench configuration and management"""
def __init__(self, workbench):
self.config = utils.load_config()
self.workbench = workbench
self.workbench_dir = (
self.config["baseDir"]
/ "workbenches"
/ f"workbench_{self.workbench.id}"
)
self.cloud_dir = "google"
def create_workbench_dir(self, credentials):
self.workbench_dir.mkdir(parents=True)
self.set_credentials(credentials)
self.copy_terraform()
self.copy_startup_script()
self.prepare_terraform_vars()
def _get_credentials_file(self):
return self.workbench_dir / "cloud_credentials"
def set_credentials(self, creds=None):
credfile = self._get_credentials_file()
if not creds:
# pull from DB
creds = self.workbench.cloud_credential.detail
with credfile.open("w") as fp:
fp.write(creds)
def start(self):
try:
self.workbench.cloud_state = "nm"
self.workbench.status = "c"
self.workbench.save()
self._terraform_init()
self.workbench.cloud_state = "m"
self.workbench.status = "i"
self.workbench.save()
self._terraform_create()
# Wait for uri to appear
self._get_proxy_uri()
self.workbench.status = "r"
self.workbench.save()
except Exception as err: # pylint: disable=broad-except
logger.error(
"Encountered error while deploying workbench %d-%s",
self.workbench.id,
self.workbench.name,
exc_info=err
)
def terminate(self):
try:
self._terraform_destroy()
except Exception as err: # pylint: disable=broad-except
logger.error(
"Encountered error while destroying workbench %d-%s",
self.workbench.id,
self.workbench.name,
exc_info=err
)
def copy_terraform(self):
terraform_dir = self.workbench_dir / "terraform"
shutil.copytree(
self.config["baseDir"]
/ "infrastructure_files"
/ "workbench_tf"
/ self.cloud_dir,
terraform_dir / self.cloud_dir,
)
return terraform_dir
def copy_startup_script(self):
user = self.workbench.trusted_user
# pylint: disable=line-too-long
startup_script_vars = f"""
USER=$(curl -s http://metadata.google.internal/computeMetadata/v1/oslogin/users?pagesize=1024 \
-H 'Metadata-Flavor: Google' | \
jq '.[][] | \
select ( .name == "{user.socialaccount_set.first().uid}") | \
.posixAccounts | \
.[].username' 2>&- | tr -d '"')
"""
slurm_config_segment = ""
try:
cid = self.workbench.attached_cluster.cloud_id
slurm_config_segment=f"""\
apt-get install -y munge libmunge-dev
mkdir -p /mnt/clustermunge
mkdir -p /etc/munge
mount slurm-{cid}-controller:/etc/munge /mnt/clustermunge
cp /mnt/clustermunge/munge.key /etc/munge/munge.key
chmod 400 /etc/munge/munge.key
chown munge:munge /etc/munge/munge.key
umount /mnt/clustermunge
rmdir /mnt/clustermunge
systemctl restart munge
useradd --system -u981 -U -m -d /var/lib/slurm -s /bin/bash slurm
echo "N" > /sys/module/nfs/parameters/nfs4_disable_idmapping
tmpdir=$(mktemp -d)
currdir=$PWD
cd $tmpdir
wget https://download.schedmd.com/slurm/slurm-21.08-latest.tar.bz2
tar xf slurm-21.08-latest.tar.bz2
cd slurm-21.08*/
#wget https://download.schedmd.com/slurm/slurm-22.05-latest.tar.bz2
#tar xf slurm-22.05-latest.tar.bz2
#cd slurm-22.05*/
./configure --prefix=/usr/local --sysconfdir=/etc/slurm
make -j $(nproc)
make install
# Throw an error if the slurm install fails
if [ "$?" -ne "0" ]; then
echo "BRINGUP FAILED"
exit 1
fi
cd $currdir
rm -r $tmpdir
mkdir -p /etc/slurm
mount slurm-{cid}-controller:/usr/local/etc/slurm /etc/slurm
"""
except AttributeError:
pass
startup_script = self.workbench_dir / "startup_script.sh"
with startup_script.open("w") as f:
f.write(
f"""#!/bin/bash
echo "starting startup script at `date`" | tee -a /tmp/startup.log
echo "Getting username..." | tee -a /tmp/startup.log
{startup_script_vars}
echo "Setting up storage" | tee -a /tmp/startup.log
sudo apt-get -y update && sudo apt-get install -y nfs-common
{slurm_config_segment}
mkdir /tmp/jupyterhome
chown $USER:$USER /tmp/jupyterhome
mkdir /home/$USER
chown $USER:$USER /home/$USER
cp /home/jupyter/.jupyter /tmp/jupyterhome/.jupyter -R
chown $USER:$USER /tmp/jupyterhome/.jupyter -R
cat << EOF > /tmp/jupyterhome/DATA_LOSS_WARNING.txt
DATA LOSS WARNING:
The data on this workbench instance is not automatically saved unless it is
saved in a shared filesystem that has been mounted.
All mounted shared filesystems are listed below. If none are listed then
all data on this instance will be deleted.
MOUNTED FILESYSTEMS:
EOF
"""
)
for mp in self.workbench.mount_points.order_by("mount_order"):
if (
self.workbench.id == mp.workbench.id
and mp.export.filesystem.hostname_or_ip
):
f.write("mkdir -p " + mp.mount_path + "\n")
f.write(
"mkdir -p /tmp/jupyterhome`dirname "
+ mp.mount_path
+ "`\n"
)
f.write(
"mount "
+ mp.export.filesystem.hostname_or_ip
+ ":"
+ mp.export.export_name
+ " "
+ mp.mount_path
+ "\n"
)
f.write("chmod 777 " + mp.mount_path + "\n")
f.write(
"ln -s "
+ mp.mount_path
+ " /tmp/jupyterhome`dirname "
+ mp.mount_path
+ "` \n"
)
f.write(
'echo "'
+ mp.export.filesystem.hostname_or_ip
+ ":"
+ mp.export.export_name
+ " is mounted at "
+ mp.mount_path
+ '" >> /tmp/jupyterhome/DATA_LOSS_WARNING.txt\n'
)
logger.debug("Writing workbench startup script")
with open(
self.config["baseDir"]
/ "infrastructure_files"
/ "gcs_bucket"
/ "workbench"
/ "startup_script_template.sh",
encoding="utf-8",
) as infile:
for line in infile:
print(line)
f.write(line)
f.write("\n")
def prepare_terraform_vars(self):
region = self.workbench.cloud_region
zone = self.workbench.cloud_zone
subnet = self.workbench.subnet.cloud_id
# Cloud-specific Terraform changes
project = json.loads(self.workbench.cloud_credential.detail)[
"project_id"
]
csp_info = f"""
region = "{region}"
zone = "{zone}"
project_name = "{project}"
subnet_name = "{subnet}"
machine_type = "{self.workbench.machine_type}"
boot_disk_type = "{self.workbench.boot_disk_type}"
boot_disk_size_gb = "{self.workbench.boot_disk_capacity}"
trusted_user = "{self.workbench.trusted_user.email}"
image_family = "{self.workbench.image_family}"
owner_id = ["{self.workbench.trusted_user.email}"]
wb_startup_script_name = "workbench/workbench_{self.workbench.id}_startup_script"
wb_startup_script_bucket = "{self.config["server"]["gcs_bucket"]}"
"""
tfvars = (
self.workbench_dir
/ "terraform"
/ self.cloud_dir
/ "terraform.tfvars"
)
with tfvars.open("w") as f:
f.write(
f"""
{csp_info}
"""
)
def get_workbench_access_key(self):
return self.workbench.get_access_key()
def _terraform_init(self):
terraform_dir = self.workbench_dir / "terraform"
extra_env = {
"GOOGLE_APPLICATION_CREDENTIALS": self._get_credentials_file()
}
try:
self.workbench.cloud_state = "cm"
self.workbench.save()
utils.run_terraform(terraform_dir / self.cloud_dir, "init")
utils.run_terraform(
terraform_dir / self.cloud_dir, "validate", extra_env=extra_env
)
utils.run_terraform(
terraform_dir / self.cloud_dir, "plan", extra_env=extra_env
)
except subprocess.CalledProcessError as cpe:
if cpe.stdout:
print(cpe.stdout.decode("utf-8"))
if cpe.stderr:
print(cpe.stderr.decode("utf-8"))
self.workbench.status = "e"
self.workbench.save()
raise
def _terraform_create(self):
terraform_dir = self.workbench_dir / "terraform"
extra_env = {
"GOOGLE_APPLICATION_CREDENTIALS": self._get_credentials_file()
}
try:
utils.run_terraform(
terraform_dir / self.cloud_dir, "apply", extra_env=extra_env
)
# Look for Management Public IP in terraform.tfstate
tf_state_file = terraform_dir / self.cloud_dir / "terraform.tfstate"
with tf_state_file.open("r") as statefp:
outputs = json.load(statefp)["outputs"]
wb_name = "UNKNOWN"
try:
wb_name = outputs["notebook_instance_name"]["value"]
except KeyError:
logger.error(
"Failed to parse workbench instance name from TF state"
)
try:
deployment_id = outputs["deployment_id"]["value"]
wb_name = f"notebooks-instance-{deployment_id}-0"
except KeyError:
logger.error(
"Failed to parse deployment ID from TF state"
)
# workbench is now being initialized
self.workbench.internal_name = wb_name
self.workbench.cloud_state = "m"
self.workbench.status = "i"
self.workbench.save()
# Ansible is now running... Probably 15-30 minutes or so
except subprocess.CalledProcessError as err:
# We can error during provisioning, in which case Terraform
# doesn't tear things down. Run a `destroy`, just in case
self.workbench.status = "e"
self.workbench.cloud_state = "um"
self.workbench.save()
logger.error("Terraform apply failed", exc_info=err)
if err.stdout:
logger.info("TF stdout:\n%s\n", err.stdout.decode("utf-8"))
if err.stderr:
logger.info("TF stderr:\n%s\n", err.stderr.decode("utf-8"))
logger.error("Attempting to clean up with Terraform destroy")
self._terraform_destroy()
raise
def _terraform_destroy(self):
terraform_dir = self.workbench_dir / "terraform"
extra_env = {
"GOOGLE_APPLICATION_CREDENTIALS": self._get_credentials_file()
}
self.workbench.status = "t"
self.workbench.cloud_state = "dm"
self.workbench.save()
try:
utils.run_terraform(
terraform_dir / self.cloud_dir, "destroy", extra_env=extra_env
)
except subprocess.CalledProcessError as err:
logger.error("Terraform destroy failed", exc_info=err)
if err.stdout:
logger.info("TF stdout:\n%s\n", err.stdout.decode("utf-8"))
if err.stderr:
logger.info("TF stderr:\n%s\n", err.stderr.decode("utf-8"))
logger.error("Resources may still exist - check manually!")
self.workbench.cloud_state = "um"
self.workbench.status = "e"
self.workbench.save()
raise
logger.info("Terraform destroy succeeded")
self.workbench.status = "d"
self.workbench.cloud_state = "xm"
self.workbench.save()
@retry(ValueError, tries=10, delay=30, logger=logger)
def _get_proxy_uri(self):
# set terraform dir
terraform_dir = self.workbench_dir / "terraform"
extra_env = {
"GOOGLE_APPLICATION_CREDENTIALS": self._get_credentials_file()
}
try:
utils.run_terraform(
terraform_dir / self.cloud_dir,
"apply",
["-refresh-only"],
extra_env=extra_env,
)
except subprocess.CalledProcessError as err:
logger.error("Terraform refresh failed", exc_info=err)
if err.stdout:
logger.info("TF stdout:\n%s\n", err.stdout.decode("utf-8"))
if err.stderr:
logger.info("TF stderr:\n%s\n", err.stderr.decode("utf-8"))
raise
tf_state_file = terraform_dir / self.cloud_dir / "terraform.tfstate"
with tf_state_file.open("r") as statefp:
outputs = json.load(statefp)["outputs"]
try:
wb_uri = outputs["notebook_proxy_uri"]["value"]
except KeyError:
logger.error(
"Failed to get workbench uri from TF output"
)
raise
if not wb_uri:
logger.info("Awaiting workbench uri update (got \"%s\")", wb_uri)
raise ValueError("Got empty URI")
logger.info("Got workbench_uri: %s", wb_uri)
self.workbench.proxy_uri = wb_uri
self.workbench.save()