def execute()

in gcpdiag/runbook/dataflow/job_permissions.py [0:0]


  def execute(self):
    """Checking dataflow worker service account permissions."""
    sa_email = op.get(flags.WORKER_SERVICE_ACCOUNT)
    project = crm.get_project(op.get(flags.PROJECT_ID))
    op.info(op.get(flags.WORKER_SERVICE_ACCOUNT))
    sa_exists = iam.is_service_account_existing(email=sa_email,
                                                billing_project_id=op.get(
                                                    flags.PROJECT_ID))
    sa_exists_cross_project = iam.is_service_account_existing(
        email=sa_email, billing_project_id=op.get(flags.CROSS_PROJECT_ID))
    if sa_exists and op.get(flags.CROSS_PROJECT_ID) is None:
      op.info('Service Account associated with Dataflow Job was found in the'
              ' same project')
      op.info('Checking permissions.')
      # Check for Service Account permissions
      sa_permission_check = iam_gs.IamPolicyCheck()
      sa_permission_check.project = op.get(flags.PROJECT_ID)
      sa_permission_check.principal = (
          f'serviceAccount:{op.get(flags.WORKER_SERVICE_ACCOUNT)}')
      sa_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_worker_service_account'  # pylint: disable=line-too-long
      sa_permission_check.require_all = True
      sa_permission_check.roles = [dataflow_constants.DATAFLOW_WORKER_ROLE]
      self.add_child(child=sa_permission_check)
    elif sa_exists_cross_project:
      op.info('Service Account associated with Dataflow Job was found in cross '
              'project')
      # Check if constraint is enforced
      op.info('Checking constraints on service account project.')
      orgpolicy_constraint_check = crm_gs.OrgPolicyCheck()
      orgpolicy_constraint_check.project = op.get(flags.CROSS_PROJECT_ID)
      orgpolicy_constraint_check.constraint = (
          'constraints/iam.disableCrossProjectServiceAccountUsage')
      orgpolicy_constraint_check.is_enforced = False
      self.add_child(orgpolicy_constraint_check)

      # Check Service Account roles
      op.info('Checking roles in service account project.')
      sa_permission_check = iam_gs.IamPolicyCheck()
      sa_permission_check.project = op.get(flags.CROSS_PROJECT_ID)
      sa_permission_check.principal = (
          f'serviceAccount:{op.get(flags.WORKER_SERVICE_ACCOUNT)}')
      sa_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account'  # pylint: disable=line-too-long
      sa_permission_check.require_all = True
      sa_permission_check.roles = [dataflow_constants.DATAFLOW_WORKER_ROLE]
      self.add_child(child=sa_permission_check)

      # Check Service Agent Service Account roles
      op.info('Checking service agent service account roles on service account '
              'project.')
      service_agent_sa = (
          f'service-{project.number}@dataflow-service-producer-prod.iam.gserviceaccount.com'
      )
      service_agent_permission_check = iam_gs.IamPolicyCheck()
      service_agent_permission_check.project = op.get(flags.CROSS_PROJECT_ID)
      service_agent_permission_check.principal = (
          f'serviceAccount:{service_agent_sa}')
      service_agent_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account'  # pylint: disable=line-too-long
      service_agent_permission_check.require_all = True
      service_agent_permission_check.roles = [
          dataflow_constants.DATAFLOW_IAM_SERVICE_ACCOUNT_USER,
          'roles/iam.serviceAccountTokenCreator'
      ]
      self.add_child(child=service_agent_permission_check)

      # Check Compute Agent Service Account
      op.info('Checking compute agent service account roles on service account '
              'project.')
      compute_agent_sa = (
          f'service-{project.number}@compute-system.iam.gserviceaccount.com')
      compute_agent_permission_check = iam_gs.IamPolicyCheck()
      compute_agent_permission_check.project = op.get(flags.CROSS_PROJECT_ID)
      compute_agent_permission_check.principal = (
          f'serviceAccount:{compute_agent_sa}')
      compute_agent_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account'  # pylint: disable=line-too-long
      compute_agent_permission_check.require_all = True
      compute_agent_permission_check.roles = [
          dataflow_constants.DATAFLOW_IAM_SERVICE_ACCOUNT_USER,
          'roles/iam.serviceAccountTokenCreator'
      ]
      self.add_child(child=compute_agent_permission_check)
    else:
      op.add_failed(project,
                    reason=op.prep_msg(op.FAILURE_REASON,
                                       service_account=op.get(
                                           flags.WORKER_SERVICE_ACCOUNT),
                                       project_id=op.get(flags.PROJECT_ID)),
                    remediation=op.prep_msg(op.FAILURE_REMEDIATION))