in gcpdiag/runbook/vertex/generalized_steps.py [0:0]
def execute(self):
"""Check if the Workbench Instance is using the latest environment version"""
project_id: str = self.project_id or op.get(flags.PROJECT_ID)
instance_name: str = self.instance_name or op.get(flags.INSTANCE_NAME)
zone: str = self.zone or op.get(flags.ZONE)
workbench_instance: notebooks.WorkbenchInstance = notebooks.get_workbench_instance(
project_id=project_id, zone=zone, instance_name=instance_name)
workbench_instance_upgradability: dict = notebooks.workbench_instance_check_upgradability(
project_id=project_id, workbench_instance_name=workbench_instance.name)
workbench_instance_upgradeable: bool = workbench_instance_upgradability.get(
'upgradeable', False)
workbench_instance_upgrade_version: str = workbench_instance_upgradability.get(
'upgradeVersion', '').upper()
workbench_instance_upgrade_info: str = workbench_instance_upgradability.get(
'upgradeInfo', '')
workbench_instance_upgrade_image: str = workbench_instance_upgradability.get(
'upgradeImage', '')
op.info(f'instance is upgradeable: {workbench_instance_upgradeable}')
op.info(f'instance upgrade info: {workbench_instance_upgrade_info}')
if workbench_instance_upgradeable:
op.add_failed(
resource=workbench_instance,
reason=op.prep_msg(
op.FAILURE_REASON,
instance_name=instance_name,
environment_version=workbench_instance.environment_version),
remediation=op.prep_msg(
op.FAILURE_REMEDIATION,
upgrade_version=workbench_instance_upgrade_version,
upgrade_image=workbench_instance_upgrade_image))
else:
if constants.WORKBENCH_INSTANCES_UPGRADABILITY_CURRENT in workbench_instance_upgrade_info:
op.add_ok(
resource=workbench_instance,
reason=op.prep_msg(
op.SUCCESS_REASON,
instance_name=instance_name,
environment_version=workbench_instance.environment_version))
elif (constants.WORKBENCH_INSTANCES_UPGRADABILITY_INVALID_STATE_INFO
in workbench_instance_upgrade_info and
workbench_instance.state != notebooks.StateEnum.STOPPED):
op.add_uncertain(resource=workbench_instance,
reason=op.prep_msg(
op.UNCERTAIN_REASON,
instance_name=instance_name,
upgrade_info=workbench_instance_upgrade_info),
remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION))
elif not workbench_instance.environment_version:
# Environment version is 0 and upgradability False when the instance is new
op.add_uncertain(resource=workbench_instance,
reason=op.prep_msg(
op.UNCERTAIN_REASON_ALT1,
instance_name=instance_name,
upgrade_info=workbench_instance_upgrade_info),
remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION_ALT1))
else:
op.add_uncertain(resource=workbench_instance,
reason=op.prep_msg(
op.UNCERTAIN_REASON_ALT2,
instance_name=instance_name,
upgrade_info=workbench_instance_upgrade_info),
remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION_ALT2))