gcpdiag/runbook/dataflow/job_permissions.py (180 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing Dataflow Jobs permissions check diagnostic tree and custom steps.""" from gcpdiag import runbook from gcpdiag.queries import crm, iam, logs from gcpdiag.runbook import StartStep, op from gcpdiag.runbook.crm import generalized_steps as crm_gs from gcpdiag.runbook.dataflow import constants as dataflow_constants from gcpdiag.runbook.dataflow import flags from gcpdiag.runbook.iam import generalized_steps as iam_gs PRODUCT_FLAG = 'dataflow' def local_realtime_query(filter_str): result = logs.realtime_query( project_id=op.get(flags.PROJECT_ID), start_time=op.get(flags.START_TIME), end_time=op.get(flags.END_TIME), filter_str=filter_str, ) return result class JobPermissions(runbook.DiagnosticTree): """Analysis and Resolution of Dataflow Jobs Permissions issues. This runbook investigates Dataflow permissions and recommends remediation steps. Areas Examined: - Dataflow User Account Permissions: Verify that individual Dataflow users have the necessary permissions to access and manage Dataflow jobs (e.g., create,update,cancel). - Dataflow Service Account Permissions: Verify that the Dataflow Service Account has the required permissions to execute and manage the Dataflow jobs - Dataflow Worker Service Account: Verify that the Dataflow Worker Service Account has the necessary permissions for worker instances within a Dataflow job to access input and output resources during job execution. - Dataflow Resource Permissions: Verify that Dataflow resources (e.g., Cloud Storage buckets, BigQuery datasets) have the necessary permissions to be accessed and used by Dataflow jobs. By ensuring that Dataflow resources have the necessary permissions, you can prevent errors and ensure that your jobs run smoothly. """ parameters = { flags.PROJECT_ID: { 'type': str, 'help': 'The Project ID of the resource under investigation', 'required': True, }, flags.PRINCIPAL: { 'type': str, 'help': ('The authenticated user account email. This is the ' 'user account that is used to authenticate the user to the ' 'console or the gcloud CLI.'), 'required': True, }, flags.WORKER_SERVICE_ACCOUNT: { 'type': str, 'help': ('Dataflow Worker Service Account used for Dataflow Job Creation' 'and execution'), 'required': True, }, flags.CROSS_PROJECT_ID: { 'type': str, 'help': ('Cross Project ID, where service account is located if it is not' ' in the same project as the Dataflow Job'), }, } def build_tree(self): """Construct the diagnostic tree with appropriate steps.""" # Instantiate your step classes start = StartStep() user_account_permissions_check = DataflowUserAccountPermissions() worker_service_account_check = DataflowWorkerServiceAccountPermissions() dataflow_resource_permissions_check = DataflowResourcePermissions() self.add_start(start) self.add_step(parent=start, child=user_account_permissions_check) project = crm.get_project(op.get(flags.PROJECT_ID)) service_agent_check = iam_gs.IamPolicyCheck() service_agent_check.project = op.get(flags.PROJECT_ID) service_agent_check.roles = [dataflow_constants.DATAFLOW_SERVICE_AGENT_ROLE] service_agent_check.principal = f'serviceAccount:service-{project.number}@dataflow-service-producer-prod.iam.gserviceaccount.com' # pylint: disable=line-too-long service_agent_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_service_account' # pylint: disable=line-too-long service_agent_check.require_all = False self.add_step(parent=start, child=service_agent_check) self.add_step(parent=start, child=worker_service_account_check) self.add_step(parent=start, child=dataflow_resource_permissions_check) self.add_end(DataflowPermissionsEnd()) class DataflowUserAccountPermissions(runbook.Step): """Check the User account permissions. "Dataflow Viewer" role allows the user to view/list the Dataflow jobs. But, cannot submit, update, drain, stop, or cancel the jobs. "Dataflow Developer" role does allows the user to create and modify (view, update, cancel etc) the dataflow jobs, but does not provide machine type, storage bucket configuration access. "Dataflow Admin" role provides complete access for creating and modifying the jobs along with the machine type and storage bucket configuration access. """ def execute(self): """Check the Authenticated User account permissions.""" dataflow_developer_role_check = iam_gs.IamPolicyCheck() dataflow_developer_role_check.project = op.get(flags.PROJECT_ID) dataflow_developer_role_check.roles = [ dataflow_constants.DATAFLOW_DEVELOPER_ROLE, dataflow_constants.DATAFLOW_IAM_SERVICE_ACCOUNT_USER, ] dataflow_developer_role_check.principal = f'user:{op.get(flags.PRINCIPAL)}' dataflow_developer_role_check.require_all = True self.add_child(dataflow_developer_role_check) class DataflowWorkerServiceAccountPermissions(runbook.Gateway): """Check the Dataflow Worker account permissions. Worker instances use the worker service account to access input and output resources after you submit your job. For the worker service account to be able to run a job, it must have the roles/dataflow.worker role. """ template = 'permissions::projectcheck' def execute(self): """Checking dataflow worker service account permissions.""" sa_email = op.get(flags.WORKER_SERVICE_ACCOUNT) project = crm.get_project(op.get(flags.PROJECT_ID)) op.info(op.get(flags.WORKER_SERVICE_ACCOUNT)) sa_exists = iam.is_service_account_existing(email=sa_email, billing_project_id=op.get( flags.PROJECT_ID)) sa_exists_cross_project = iam.is_service_account_existing( email=sa_email, billing_project_id=op.get(flags.CROSS_PROJECT_ID)) if sa_exists and op.get(flags.CROSS_PROJECT_ID) is None: op.info('Service Account associated with Dataflow Job was found in the' ' same project') op.info('Checking permissions.') # Check for Service Account permissions sa_permission_check = iam_gs.IamPolicyCheck() sa_permission_check.project = op.get(flags.PROJECT_ID) sa_permission_check.principal = ( f'serviceAccount:{op.get(flags.WORKER_SERVICE_ACCOUNT)}') sa_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_worker_service_account' # pylint: disable=line-too-long sa_permission_check.require_all = True sa_permission_check.roles = [dataflow_constants.DATAFLOW_WORKER_ROLE] self.add_child(child=sa_permission_check) elif sa_exists_cross_project: op.info('Service Account associated with Dataflow Job was found in cross ' 'project') # Check if constraint is enforced op.info('Checking constraints on service account project.') orgpolicy_constraint_check = crm_gs.OrgPolicyCheck() orgpolicy_constraint_check.project = op.get(flags.CROSS_PROJECT_ID) orgpolicy_constraint_check.constraint = ( 'constraints/iam.disableCrossProjectServiceAccountUsage') orgpolicy_constraint_check.is_enforced = False self.add_child(orgpolicy_constraint_check) # Check Service Account roles op.info('Checking roles in service account project.') sa_permission_check = iam_gs.IamPolicyCheck() sa_permission_check.project = op.get(flags.CROSS_PROJECT_ID) sa_permission_check.principal = ( f'serviceAccount:{op.get(flags.WORKER_SERVICE_ACCOUNT)}') sa_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account' # pylint: disable=line-too-long sa_permission_check.require_all = True sa_permission_check.roles = [dataflow_constants.DATAFLOW_WORKER_ROLE] self.add_child(child=sa_permission_check) # Check Service Agent Service Account roles op.info('Checking service agent service account roles on service account ' 'project.') service_agent_sa = ( f'service-{project.number}@dataflow-service-producer-prod.iam.gserviceaccount.com' ) service_agent_permission_check = iam_gs.IamPolicyCheck() service_agent_permission_check.project = op.get(flags.CROSS_PROJECT_ID) service_agent_permission_check.principal = ( f'serviceAccount:{service_agent_sa}') service_agent_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account' # pylint: disable=line-too-long service_agent_permission_check.require_all = True service_agent_permission_check.roles = [ dataflow_constants.DATAFLOW_IAM_SERVICE_ACCOUNT_USER, 'roles/iam.serviceAccountTokenCreator' ] self.add_child(child=service_agent_permission_check) # Check Compute Agent Service Account op.info('Checking compute agent service account roles on service account ' 'project.') compute_agent_sa = ( f'service-{project.number}@compute-system.iam.gserviceaccount.com') compute_agent_permission_check = iam_gs.IamPolicyCheck() compute_agent_permission_check.project = op.get(flags.CROSS_PROJECT_ID) compute_agent_permission_check.principal = ( f'serviceAccount:{compute_agent_sa}') compute_agent_permission_check.template = 'gcpdiag.runbook.dataflow::permissions::dataflow_cross_project_worker_service_account' # pylint: disable=line-too-long compute_agent_permission_check.require_all = True compute_agent_permission_check.roles = [ dataflow_constants.DATAFLOW_IAM_SERVICE_ACCOUNT_USER, 'roles/iam.serviceAccountTokenCreator' ] self.add_child(child=compute_agent_permission_check) else: op.add_failed(project, reason=op.prep_msg(op.FAILURE_REASON, service_account=op.get( flags.WORKER_SERVICE_ACCOUNT), project_id=op.get(flags.PROJECT_ID)), remediation=op.prep_msg(op.FAILURE_REMEDIATION)) class DataflowResourcePermissions(runbook.Step): """Check the Dataflow Resource permissions. Verify that Dataflow resources have the necessary permissions to be accessed and used by Dataflow jobs. Ensure that the your Dataflow project Worker Service Account have the required permissions to access and modify these resources. """ def execute(self): """Check the Dataflow Resource permissions.""" filter_str = [ 'log_id("dataflow.googleapis.com/job-message")', 'resource.type="dataflow_step"', ('textPayload=~("Failed to write a file to temp location" OR "Unable' ' to rename output files" OR "Unable to delete temp files")'), ] filter_str = '\n'.join(filter_str) log_entries = local_realtime_query(filter_str) if log_entries: op.info('Cloud Storage buckets related errors found in the logs..') op.info('Checking worker service account storage object admin role.') dataflow_storage_role_check = iam_gs.IamPolicyCheck() if op.get(flags.CROSS_PROJECT_ID): dataflow_storage_role_check.project = op.get(flags.CROSS_PROJECT_ID) dataflow_storage_role_check.roles = ['roles/storage.objectAdmin'] dataflow_storage_role_check.principal = ( f'serviceAccount:{op.get(flags.WORKER_SERVICE_ACCOUNT)}') dataflow_storage_role_check.require_all = True self.add_child(dataflow_storage_role_check) else: op.info('No Cloud Storage buckets related errors found in the logs') class DataflowPermissionsEnd(runbook.EndStep): """RUNBOOK COMPLETED.""" def execute(self): """Permissions checks completed.""" op.info('Dataflow Resources Permissions Checks Completed')