# Automation Gallery - Credential Scan on Azure Blob Storage

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Apache Spark Version:__ 3.1<br>
__Required Packages:__ No<br>
__Platforms Supported:__  Azure Synapse Analytics
     
__Data Source Required:__ No 
    
### Description
This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Blob Storage using Azure SDK for Python.<br>
*** No need to download and install any other Python modules. ***<br>
*** Please run the cells sequentially to avoid errors.  Please do not use "run all cells". *** <br>

## Table of Contents
1. Warm-up
2. Authentication to Azure Storage
3. Scan Azure Blob for Leaking Credentials
4. Save result to Microsoft Sentinel Dynamic Summaries

## 1. Warm-up

In [None]:
# Load Python libraries that will be used in this notebook
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
from azure.identity import ClientSecretCredential
from datetime import datetime, timedelta, timezone

import json
from json import JSONEncoder
from IPython.display import display, HTML, Markdown
import re
import pandas as pd

In [None]:
# Functions will be used in this notebook
def get_file_content(blob):
    "Decoding file content"
    try:
        content = blob.content_as_text(max_concurrency=1, encoding='UTF-8')
    except UnicodeDecodeError:
        content = blob.content_as_text(max_concurrency=1, encoding='UTF-16')
    except Exception as ex:
        print(ex)
        content= ""
    return content

def get_regex_list():
    "This function return RegEx list for credscan"
    regex_list = [
        "(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}",
        "(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])",
        "(?-i:)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W",
        "(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)",
        "(?i)visualstudio\\.com.{1,100}\\W(?-i:)[a-z2-7]{52}\\W",
        "(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d",
        "(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}",
        "(?i)code=[a-z0-9%]{54,74}(%3d){2}",
        "(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W",
        "(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==",
        "(?-i:)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}",
        "(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]",
        "(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})",
        "(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}",
        "(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
        "(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)",
        "(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)",
        "(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}",
        "(?i)authorization[,\\[:= \"']+([dbaohmnsv])",
        "(?i)-u\\s+.{2,100}-p\\s+[^\\-/]",
        "(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
        "(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}",
        "(?-i:)(eyJ0eXAiOiJKV1Qi|eyJhbGci)",
        "(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)",
        "(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]",
        "(?i)xox[pbar]\\-[a-z0-9]",
        "(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)",
        "(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?",
        "(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)",
        "(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']",
        "(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)",
        "(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})",
    ]

    return regex_list

def convert_result_to_string(result_row):
    if (type(result_row)) == str:
        return result_row
    elif (type(result_row)) == tuple:
        return ','.join([m for m in result_row if len(m) > 0])

def file_modified_date_check(days_back, modified_date):
    aware_local_now = datetime.now(timezone.utc).astimezone()
    time_between_modified = aware_local_now - modified_date
    return time_between_modified.days < days_back

class file_scan_result:
    """
    This class is for handling scan result for each file.
    """
    def __init__(self, file_name, file_last_modified):
        self.file_name = file_name
        self.results = {}

    def add_result(self, key, value):
        """ Add result to the dictionary, key is regex string, value will be list """
        self.results[key] = value

class result_encoder(JSONEncoder):
        def default(self, o):
            return o.__dict__

from msrest.authentication import BasicTokenAuthentication
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
from azure.identity import DefaultAzureCredential

class AzureIdentityCredentialAdapter(BasicTokenAuthentication):
    def __init__(self, credential=None, resource_id="https://management.azure.com/.default", **kwargs):
        """Adapt any azure-identity credential to work with SDK that needs azure.common.credentials or msrestazure.
        Default resource is ARM (syntax of endpoint v2)
        :param credential: Any azure-identity credential (DefaultAzureCredential by default)
        :param str resource_id: The scope to use to get the token (default ARM)
        """
        super(AzureIdentityCredentialAdapter, self).__init__(None)
        if credential is None:
            credential = DefaultAzureCredential()
        self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs)

    def _make_request(self):
        return PipelineRequest(
            HttpRequest(
                "AzureIdentityCredentialAdapter",
                "https://fakeurl"
            ),
            PipelineContext(None)
        )

    def set_token(self):
        """Ask the azure-core BearerTokenCredentialPolicy policy to get a token.
        Using the policy gives us for free the caching system of azure-core.
        We could make this code simpler by using private method, but by definition
        I can't assure they will be there forever, so mocking a fake call to the policy
        to extract the token, using 100% public API."""
        request = self._make_request()
        self._policy.on_request(request)
        # Read Authorization, and get the second part after Bearer
        token = request.http_request.headers["Authorization"].split(" ", 1)[1]
        self.token = {"access_token": token}

    def get_token(self):
        """Get access token."""
        return self.token

    def signed_session(self, session=None):
        self.set_token()
        return super(AzureIdentityCredentialAdapter, self).signed_session(session)

In [None]:
import uuid
import requests

class DynamicSummary():
    """ Dynamic Summary object model """
    
    @staticmethod
    def get_new_guid():
        """ generate new GUID """
        return uuid.uuid4()

    def __init__(self, summary_id):
        self.summary_id = summary_id

    def serialize(self):
        serialized_str = '"summaryId": "' + self.summary_id + '", "summaryName": "' + self.summary_name + '", "azureTenantId": "' + self.azure_tenant_id + '", "summaryDescription": "' +  self.summary_description + '"'
        if hasattr(self, 'relation_name') and self.relation_name != None:
            serialized_str += ', "relationName": "' + self.relation_name + '"'
        if hasattr(self, 'relation_id') and self.relation_id != None:
            serialized_str += ', "relationId": "' + self.relation_id + '"'
        if hasattr(self, 'search_key') and self.search_key != None:
            serialized_str += ', "searchKey": "' + self.search_key + '"'
        if hasattr(self, 'tactics') and self.tactics != None:
            serialized_str += ', "tactics": "' + self.tactics + '"'
        if hasattr(self, 'techniques') and self.techniques != None:
            serialized_str += ', "techniques": "' + self.techniques + '"'
        if hasattr(self, 'source_info') and self.source_info != None:
            serialized_str += ', "sourceInfo": "' + self.source_info + '"'
        if hasattr(self, 'summary_items') and self.summary_items != None:
            serialized_str += ', "rawContent": "[' + DynamicSummary.serializeItems(self.summary_items) + ']"'

        return serialized_str

    def serializeItems(items):
        raw_content = ''
        isFirst = True
        for item in items:
            if isFirst == True:
                isFirst = False
            else:
                raw_content += ','
                
            raw_content += json.dumps(DynamicSummary.serializeItem(item)).strip('"')
        return raw_content

    def serializeItem(item):
        serialized_item_tsr = '{'
        serialized_item_tsr += '"summaryItemId": "' + item.summary_item_id.urn[9:] + '"'

        if hasattr(item, 'relation_name') and item.relation_name != None:
            serialized_item_tsr += ', "relationName": "' + item.relation_name + '"'
        if hasattr(item, 'relation_id') and item.relation_id != None:
            seriserialized_item_tsralized_str += ', "relationId" :"' + item.relation_id + '"'
        if hasattr(item, 'search_key') and item.search_key != None:
            serialized_item_tsr += ', "searchKey": "' + item.search_key + '"'
        if hasattr(item, 'tactics') and item.tactics != None:
            serialized_item_tsr += ', "tactics": "' + item.tactics + '"'
        if hasattr(item, 'techniques') and item.techniques != None:
            serialized_item_tsr += ', "techniques": "' + item.techniques + '"'
        if hasattr(item, 'event_time_utc') and item.event_time_utc != None:
            serialized_item_tsr += ', "eventTimeUTC" :"' + item.event_time_utc.isoformat() + 'Z"'
        if hasattr(item, 'observable_type') and item.observable_type != None:
            serialized_item_tsr += ', "observableType": "' + item.observable_type + '"'
        if hasattr(item, 'observable_value') and item.observable_value != None:
            serialized_item_tsr += ', "observableValue": "' + item.observable_value + '"'
        if hasattr(item, 'packed_content') and item.packed_content != None:
            serialized_item_tsr += ', "packedContent": ' + item.packed_content
        serialized_item_tsr += '}'
    
        return serialized_item_tsr

    def construct_summary(self, tenant_id, summary_name, summary_description, items, \
        relation_name=None, relation_id=None, search_key=None, tactics=None, techniques=None, source_info=None, **kwargs):
        """ Building summary level data object """
        self.summary_name = summary_name
        self.azure_tenant_id = tenant_id
        self.summary_description = summary_description
        if relation_name != None:
            self.relation_name = relation_name
        if relation_id != None:
            self.relation_id = relation_id
        if search_key != None:
            self.search_key = search_key
        if tactics != None:
            self.tactics = tactics
        if techniques != None:
            self.techniques = techniques
        if source_info != None:
            self.source_info = source_info
        if summary_items != None:
            self.summary_items = items

    def construct_summary_item(self, summary_item_id, \
        relation_name=None, relation_id=None, search_key=None, tactics=None, techniques=None, event_time_utc=None, observable_type=None, observable_value=None, packed_content=None, **kwargs):
        """ Building summary item level data object """
        
        item = DynamicSummary(self.summary_id)
        item.summary_item_id = summary_item_id
        if relation_name != None:
            item.relation_name = relation_name
        if relation_id != None:
            item.relation_id = relation_id
        if search_key != None:
            item.search_key = search_key
        if tactics != None:
            item.tactics = tactics
        if techniques != None:
            item.techniques = techniques
        if event_time_utc != None:
            item.event_time_utc = event_time_utc
        if observable_type != None:
            item.observable_type = observable_type
        if observable_value != None:
            item.observable_value = observable_value
        if packed_content != None:
            item.packed_content = packed_content

        return item
    
    def construct_arm_rest_url(subscription_id, resource_group, workspace_name, summary_guid):
        "Build URL for Sentinel Dynamic Summaries REST API"
        api_version = "2023-03-01-preview"
        provider_name = "Microsoft.OperationalInsights"
        workspace_provider_name = "Microsoft.SecurityInsights/dynamicSummaries"
        root_url = "https://management.azure.com"
        arm_rest_url_template = "{0}/subscriptions/{1}/resourceGroups/{2}/providers/{3}/workspaces/{4}/providers/{5}/{6}?api-version={7}"
        return arm_rest_url_template.format(root_url, subscription_id, resource_group, provider_name, workspace_name, workspace_provider_name, summary_guid, api_version)


    def call_azure_rest_api_for_creating_dynamic_summary(token, arm_rest_url, summary):
        "Calling Microsoft Sentinel REST API"
        bearer_token = "Bearer " + token
        headers = {"Authorization": bearer_token, "content-type":"application/json" }
        response = requests.put(arm_rest_url, headers=headers, data=summary, verify=True)
        return response

    def display_result(response):
        "Display the result set as pandas.DataFrame"
        if response != None:
            df = pd.DataFrame(response.json()["value"])
            display(df)
        

## 2. Authentication to Azure Storage

In [None]:
tenant_id = ''
subscription_id = ''
akv_name = ''
akv_link_name = ''
resource_group_name = ''
storage_account_name = ''
container_name = 'azureml'
client_id_name = ''
client_secret_name = ''
resource_group_name_for_dynamic_summaries = ''
sentinel_workspace_name_for_dynamic_summaries = ''
dynamic_summary_name = ''
dynamic_summary_guid = ''

In [None]:
client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)
client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)

credential = ClientSecretCredential(
    tenant_id=tenant_id, 
    client_id=client_id, 
    client_secret=client_secret)
cred = AzureIdentityCredentialAdapter(credential)
access_token = credential.get_token("https://management.azure.com/.default")
token = access_token[0]

## 3. Scan Azure Blob for Leaking Credentials

In [None]:
storage_client = StorageManagementClient(cred, subscription_id=subscription_id)

try: 
    storage_keys = storage_client.storage_accounts.list_keys(resource_group_name,storage_account_name)
    days_back = 500
    if storage_keys != None:
        storage_key = {v.key_name: v.value for v in storage_keys.keys}['key1']

        blob_service_client = BlobServiceClient(
            account_url="https://{0}.blob.core.windows.net".format(storage_account_name),
            credential=storage_key
        )

        if blob_service_client != None:
            container_client = blob_service_client.get_container_client(container_name)
            if container_client != None:
                blob_list = container_client.list_blobs()
                result_objects = []
                for indexblob, b in enumerate(blob_list):
                    print("Blob name: " + b.name)

                    try:
                        if (file_modified_date_check(days_back, b.last_modified)):
                            blob = container_client.download_blob(b)
                            content = get_file_content(blob)

                            # Run Regex strings on the file content
                            import warnings
                            warnings.filterwarnings('ignore')

                            if content != None:
                                has_leaking = False
                                regex_list = get_regex_list()
                                for indexregex, regex in enumerate(regex_list):
                                    result_object = file_scan_result(b.name, b.last_modified);
                                    re.compile(regex)

                                    results = re.findall(regex, content)
                                    matched_contents = []
                                    if results:
                                        print("================================================")
                                        print("MATCHED REGEX:\n" + regex)
                                        print("---------------MATCHED CONTENT -----------------")
                                        for result in results:
                                            print(str(result))
                                            matched_contents.append(convert_result_to_string(result))
                                        print("================================================")
                                        has_leaking = True 
                                        result_object.add_result("blob" + str(indexblob) + "-regex" + str(indexregex), matched_contents)
                                        result_objects.append(result_object.results)
                                if has_leaking == False:
                                    print('No leaking data found')
                    except Exception as e:
                        print(e)
                print("Printing to check how it will look like")
                print(result_encoder().encode(result_objects))
                scan_data = json.dumps(result_objects, indent=4, cls=result_encoder)
                print(scan_data)
        else:
            print("failed on blob service client")
except Exception as ex:
    if str(ex).find("AuthorizationFailed") >= 0:
        print("========================================================================")
        print("Error: Service principal has no sufficient permission to perform tasks.")
        print("========================================================================")

    raise
    


## 4. Save result to Microsoft Sentinel Dynamic Summaries

In [None]:
if dynamic_summary_name != None and dynamic_summary_name != '':
    summary = DynamicSummary(dynamic_summary_guid)
    summary_description = "This summary is generated from notebook - AutomationGallery-CredentialScanOnAzureBlobStorage."
    summary_items = []

if result_objects:
    for res_obj in result_objects:
        res_df = pd.DataFrame.from_dict(res_obj)

        if not res_df.empty:
            for index, row in res_df.iterrows():
                packed_content = res_df.iloc[index].to_json()
                summary_items.append(summary.construct_summary_item(DynamicSummary.get_new_guid(), None, None, None, None, None, datetime.utcnow(), None, None, packed_content))

    summary.construct_summary(tenant_id, dynamic_summary_name, summary_description, summary_items)
    summary_json = "{ \"properties\": {" +  summary.serialize() + "}}"

    #print(summary_json)

In [None]:
if result_objects and dynamic_summary_name != None and dynamic_summary_name != '':
    dyn_sum_api_url = DynamicSummary.construct_arm_rest_url(subscription_id, resource_group_name_for_dynamic_summaries, sentinel_workspace_name_for_dynamic_summaries, dynamic_summary_guid)
    response = DynamicSummary.call_azure_rest_api_for_creating_dynamic_summary(token, dyn_sum_api_url, summary_json)

    print(response.status_code)