def execute()

in gcpdiag/runbook/vertex/generalized_steps.py [0:0]


  def execute(self):
    """Check if the Workbench Instance is using the latest environment version"""
    project_id: str = self.project_id or op.get(flags.PROJECT_ID)
    instance_name: str = self.instance_name or op.get(flags.INSTANCE_NAME)
    zone: str = self.zone or op.get(flags.ZONE)
    workbench_instance: notebooks.WorkbenchInstance = notebooks.get_workbench_instance(
        project_id=project_id, zone=zone, instance_name=instance_name)
    workbench_instance_upgradability: dict = notebooks.workbench_instance_check_upgradability(
        project_id=project_id, workbench_instance_name=workbench_instance.name)
    workbench_instance_upgradeable: bool = workbench_instance_upgradability.get(
        'upgradeable', False)
    workbench_instance_upgrade_version: str = workbench_instance_upgradability.get(
        'upgradeVersion', '').upper()
    workbench_instance_upgrade_info: str = workbench_instance_upgradability.get(
        'upgradeInfo', '')
    workbench_instance_upgrade_image: str = workbench_instance_upgradability.get(
        'upgradeImage', '')
    op.info(f'instance is upgradeable: {workbench_instance_upgradeable}')
    op.info(f'instance upgrade info: {workbench_instance_upgrade_info}')
    if workbench_instance_upgradeable:
      op.add_failed(
          resource=workbench_instance,
          reason=op.prep_msg(
              op.FAILURE_REASON,
              instance_name=instance_name,
              environment_version=workbench_instance.environment_version),
          remediation=op.prep_msg(
              op.FAILURE_REMEDIATION,
              upgrade_version=workbench_instance_upgrade_version,
              upgrade_image=workbench_instance_upgrade_image))
    else:
      if constants.WORKBENCH_INSTANCES_UPGRADABILITY_CURRENT in workbench_instance_upgrade_info:
        op.add_ok(
            resource=workbench_instance,
            reason=op.prep_msg(
                op.SUCCESS_REASON,
                instance_name=instance_name,
                environment_version=workbench_instance.environment_version))
      elif (constants.WORKBENCH_INSTANCES_UPGRADABILITY_INVALID_STATE_INFO
            in workbench_instance_upgrade_info and
            workbench_instance.state != notebooks.StateEnum.STOPPED):
        op.add_uncertain(resource=workbench_instance,
                         reason=op.prep_msg(
                             op.UNCERTAIN_REASON,
                             instance_name=instance_name,
                             upgrade_info=workbench_instance_upgrade_info),
                         remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION))
      elif not workbench_instance.environment_version:
        # Environment version is 0 and upgradability False when the instance is new
        op.add_uncertain(resource=workbench_instance,
                         reason=op.prep_msg(
                             op.UNCERTAIN_REASON_ALT1,
                             instance_name=instance_name,
                             upgrade_info=workbench_instance_upgrade_info),
                         remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION_ALT1))
      else:
        op.add_uncertain(resource=workbench_instance,
                         reason=op.prep_msg(
                             op.UNCERTAIN_REASON_ALT2,
                             instance_name=instance_name,
                             upgrade_info=workbench_instance_upgrade_info),
                         remediation=op.prep_msg(op.UNCERTAIN_REMEDIATION_ALT2))