securitycenter/snippets_v2/snippets_findings_v2.py (222 lines of code) (raw):

#!/usr/bin/env python # # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Examples of working with source and findings in Security Command Center.""" from typing import Dict # [START securitycenter_list_all_findings_v2] def list_all_findings(organization_id, source_name, location_id) -> int: """ lists all findings for a source Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the count of all findings for a source """ from google.cloud import securitycenter_v2 # Create a client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" # Create the request dictionary request = {"parent": all_sources} # Print the request for debugging print("Request: ", request) finding_result_iterator = client.list_findings(request={"parent": all_sources}) for count, finding_result in enumerate(finding_result_iterator): print( "{}: name: {} resource: {}".format( count, finding_result.finding.name, finding_result.finding.resource_name ) ) return finding_result_iterator # [END securitycenter_list_all_findings_v2] # [START securitycenter_list_filtered_findings_v2] def list_filtered_findings(organization_id, source_name, location_id) -> int: """ lists filtered findings for a source Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the filtered findings for a source """ count = 0 from google.cloud import securitycenter_v2 # Create a new client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" finding_result_iterator = client.list_findings( request={"parent": all_sources, "filter": 'severity="LOW"'} ) # Iterate an print all finding names and the resource they are # in reference to. for count, finding_result in enumerate(finding_result_iterator): print( "{}: name: {} resource: {}".format( count, finding_result.finding.name, finding_result.finding.resource_name ) ) return count # [END securitycenter_list_filtered_findings_v2] # [START securitycenter_group_all_findings_v2] def group_all_findings(organization_id, source_name, location_id) -> int: """ Demonstrates grouping all findings across an organization. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the count of groups all findings for a source across the organization. """ count = 0 from google.cloud import securitycenter_v2 # Create a client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" group_result_iterator = client.group_findings( request={"parent": all_sources, "group_by": "category"} ) for count, group_result in enumerate(group_result_iterator): print((count + 1), group_result) return count # [END securitycenter_group_all_findings_v2] # [START securitycenter_group_filtered_findings_v2] def group_filtered_findings(organization_id, source_name, location_id) -> int: """ Demonstrates grouping all filtered findings across an organization. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the count of groups all filtered findings for a source across the organization. """ count = 0 from google.cloud import securitycenter_v2 # Create a client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" group_result_iterator = client.group_findings( request={ "parent": all_sources, "group_by": "category", "filter": 'state="ACTIVE"', } ) for count, group_result in enumerate(group_result_iterator): print((count + 1), group_result) return count # [END securitycenter_group_filtered_findings_v2] # [START securitycenter_list_findings_with_security_marks_v2] def list_findings_with_security_marks(organization_id, source_name, location_id) -> int: """ lists all filtered findings with security marks across an organization. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the count of filtered findings with security marks across the organization. """ count = 0 from google.cloud import securitycenter_v2 # Create a new client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" # below filter is used to list active and unmuted findings without security marks acknowledgement as true. finding_result_iterator = client.list_findings( request={ "parent": all_sources, "filter": 'NOT security_marks.marks.ACK="true" AND NOT mute="MUTED" AND state="ACTIVE"', } ) # Iterate an print all finding names and the resource they are # in reference to. for count, finding_result in enumerate(finding_result_iterator): print( "{}: name: {} resource: {}".format( count, finding_result.finding.name, finding_result.finding.resource_name ) ) return count # [END securitycenter_list_findings_with_security_marks_v2] # [START securitycenter_group_findings_by_state_v2] def group_findings_by_state(organization_id, source_name, location_id) -> int: """ groups the findings across an organization. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: int: returns the count of group findings for a source across the organization. """ count = 0 from google.cloud import securitycenter_v2 # Create a client. client = securitycenter_v2.SecurityCenterClient() parent = f"organizations/{organization_id}" all_sources = f"{parent}/sources/{source_name}/locations/{location_id}" group_result_iterator = client.group_findings( request={"parent": all_sources, "group_by": "state"} ) for count, group_result in enumerate(group_result_iterator): print((count + 1), group_result) return count # [END securitycenter_group_findings_by_state_v2] # [START securitycenter_create_finding_v2] def create_finding( organization_id, location_id, finding_id, source_name, category ) -> Dict: """ cretaes a new finding Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created finding_id: unique identifier provided by the client. location_id: GCP location id; example: 'global' category: the additional category group with in findings. Returns: Dict: returns the created findings details. """ import datetime from google.cloud import securitycenter_v2 from google.cloud.securitycenter_v2 import Finding # Create a new client. client = securitycenter_v2.SecurityCenterClient() # Use the current time as the finding "event time". event_time = datetime.datetime.now(tz=datetime.timezone.utc) # 'source_name' is the resource path for a source that has been # created previously (you can use list_sources to find a specific one). # Its format is: # source_name = "organizations/{organization_id}/sources/{source_id}" # e.g.: # source_name = "organizations/111122222444/sources/1234" # source_name = f"organizations/{organization_id}/sources/{source_name}" # category= "MEDIUM_RISK_ONE" # The resource this finding applies to. The CSCC UI can link # the findings for a resource to the corresponding Asset of a resource # if there are matches. resource_name = ( f"//cloudresourcemanager.googleapis.com/organizations/{organization_id}" ) finding = Finding( state=Finding.State.ACTIVE, resource_name=resource_name, category=category, event_time=event_time, ) parent = source_name + "/locations/" + location_id # Call The API. created_finding = client.create_finding( request={"parent": parent, "finding_id": finding_id, "finding": finding} ) print(created_finding) return created_finding # [END securitycenter_create_finding_v2] # [START securitycenter_update_finding_source_properties_v2] def update_finding(source_name, location_id) -> Dict: """ updates a finding Args: source_name: is the resource path for a source that has been created location_id: GCP location id; example: 'global' Returns: Dict: returns the updated findings details. """ import datetime from google.cloud import securitycenter_v2 from google.cloud.securitycenter_v2 import Finding from google.protobuf import field_mask_pb2 client = securitycenter_v2.SecurityCenterClient() # Only update the specific source property and event_time. event_time # is required for updates. field_mask = field_mask_pb2.FieldMask( paths=["source_properties.s_value", "event_time"] ) # Set the update time to Now. This must be some time greater then the # event_time on the original finding. event_time = datetime.datetime.now(tz=datetime.timezone.utc) # 'source_name' is the resource path for a source that has been # created previously (you can use list_sources to find a specific one). # Its format is: # source_name = "organizations/{organization_id}/sources/{source_id}" # e.g.: # source_name = "organizations/111122222444/sources/1234" finding_name = f"{source_name}/locations/{location_id}/findings/samplefindingid" finding = Finding( name=finding_name, source_properties={"s_value": "new_string"}, event_time=event_time, ) updated_finding = client.update_finding( request={"finding": finding, "update_mask": field_mask} ) print( "New Source properties: {}, Event Time {}".format( updated_finding.source_properties, updated_finding.event_time ) ) return updated_finding # [END securitycenter_update_finding_source_properties_v2] # [START securitycenter_create_source_v2] def create_source(organization_id) -> Dict: """ Create a new findings source Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" Returns: Dict: returns the created findings source details. """ from google.cloud import securitycenter_v2 client = securitycenter_v2.SecurityCenterClient() org_name = f"organizations/{organization_id}" response = client.create_source( request={ "parent": org_name, "source": { "display_name": "Customized Display Name", "description": "A new custom source that does X", }, } ) print(f"Created Source: {response.name}") return response # [END securitycenter_create_source_v2] # [START securitycenter_get_source_v2] def get_source(source_name) -> Dict: """ Gets the details of an existing source. Args: source_name: is the resource path for a source that has been created Returns: Dict: returns the details of existing source. """ from google.cloud import securitycenter_v2 client = securitycenter_v2.SecurityCenterClient() # 'source_name' is the resource path for a source that has been # created previously (you can use list_sources to find a specific one). # Its format is: # source_name = "organizations/{organization_id}/sources/{source_id}" # e.g.: # source_name = "organizations/111122222444/sources/1234" source = client.get_source(request={"name": source_name}) print(f"Source: {source}") return source # [END securitycenter_get_source_v2] # [START securitycenter_update_source_v2] def update_source(source_name) -> Dict: """ Updates a source's display name. Args: source_name: is the resource path for a source that has been created Returns: Dict: returns the details of updated source. """ from google.cloud import securitycenter_v2 from google.protobuf import field_mask_pb2 client = securitycenter_v2.SecurityCenterClient() # Field mask to only update the display name. field_mask = field_mask_pb2.FieldMask(paths=["display_name"]) # 'source_name' is the resource path for a source that has been # created previously (you can use list_sources to find a specific one). # Its format is: # source_name = "organizations/{organization_id}/sources/{source_id}" # e.g.: # source_name = "organizations/111122222444/sources/1234" updated = client.update_source( request={ "source": {"name": source_name, "display_name": "Updated Display Name"}, "update_mask": field_mask, } ) print(f"Updated Source: {updated}") return updated # [END securitycenter_update_source_v2] # [START securitycenter_list_sources_v2] def list_source(organization_id) -> int: """ lists the findings source Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" Returns: Dict: returns the count of the findings source """ count = -1 from google.cloud import securitycenter_v2 # Create a new client. client = securitycenter_v2.SecurityCenterClient() # 'parent' must be in one of the following formats: # "organizations/{organization_id}" # "projects/{project_id}" # "folders/{folder_id}" parent = f"organizations/{organization_id}" # Call the API and print out each existing source. for count, source in enumerate(client.list_sources(request={"parent": parent})): print(count, source) return count # [END securitycenter_list_sources_v2] # [START securitycenter_get_source_iam_v2] def get_iam_policy(organization_id, source_name) -> Dict: """ Gets the iam policy of the source. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created Returns: Dict: returns the iam policy """ from google.cloud import securitycenter_v2 client = securitycenter_v2.SecurityCenterClient() source_name = f"organizations/{organization_id}/sources/{source_name}" policy = client.get_iam_policy(request={"resource": source_name}) print(f"Policy: {policy}") return policy # [END securitycenter_get_source_iam_v2] # [START securitycenter_set_source_iam_v2] def set_source_iam_policy(organization_id, source_name, user_email, role_id) -> Dict: """ Gives a user findingsEditor permission to the source. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created user_email: user_email is an e-mail address known to Cloud IAM (e.g. a gmail address).user_mail = user@somedomain.com role_id: role_id to assign Returns: Dict: returns the source iam policy """ from google.cloud import securitycenter_v2 from google.iam.v1 import policy_pb2 client = securitycenter_v2.SecurityCenterClient() source_name = f"organizations/{organization_id}/sources/{source_name}" # Get the old policy so we can do an incremental update. old_policy = client.get_iam_policy(request={"resource": source_name}) print(f"Old Policy: {old_policy}") # Setup a new IAM binding. binding = policy_pb2.Binding() binding.role = role_id binding.members.append(f"user:{user_email}") # Setting the e-tag avoids over-write existing policy updated = client.set_iam_policy( request={ "resource": source_name, "policy": {"etag": old_policy.etag, "bindings": [binding]}, } ) print(f"Updated Policy: {updated}") return updated # [END securitycenter_set_source_iam_v2] # [START securitycenter_test_iam_v2] def troubleshoot_iam_permissions(organization_id, source_name, permissions): """ Demonstrate calling test_iam_permissions to determine if the service account has the correct permisions. Args: organization_id: organization_id is the numeric ID of the organization. e.g.:organization_id = "111122222444" source_name: is the resource path for a source that has been created permissions: Returns: Dict: returns the source iam policy """ from google.cloud import securitycenter_v2 # Create a client. client = securitycenter_v2.SecurityCenterClient() source_name = f"organizations/{organization_id}/sources/{source_name}" # Check for permssions to call create_finding or update_finding. permission_response = client.test_iam_permissions( request={ "resource": source_name, "permissions": permissions, } ) print( "Permision to create or update findings? {}".format( len(permission_response.permissions) > 0 ) ) return permission_response # [END securitycenter_test_iam_v2]