elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/alarm_filehash/ioc_ibm.py (79 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Part of RedELK This check queries IBM X-Force API given a list of md5 hashes. Authors: - Outflank B.V. / Mark Bergman (@xychix) - Lorenzo Bernardi (@fastlorenzo) """ import logging from datetime import datetime import requests from modules.helpers import get_value # Rate limiting: # Free Tier (Non-Commercial Use Only): The free tier allows usage of up to 5,000 records per month # Commercial API - Paid Tier (Commercial Use): Usage is priced by the number of data records that you access, which are sold in packs of 10,000 records per month class IBM: """This check queries IBM X-Force API given a list of md5 hashes.""" def __init__(self, basic_auth): # self.report = {} # self.report['source'] = 'IBM X-Force' self.logger = logging.getLogger("alarm_filehash.ioc_ibm") self.basic_auth = basic_auth def get_remaining_quota(self): """Returns the number of hashes that could be queried within this run""" url = "https://api.xforce.ibmcloud.com/all-subscriptions/usage" headers = {"Accept": "application/json", "Authorization": self.basic_auth} # Get the quotas, if response code != 200, return 0 so we don't query further response = requests.get(url, headers=headers) if response.status_code == 200: json_response = response.json() else: self.logger.warning( "Error retrieving IBM X-Force Quota (HTTP Status code: %d)", response.status_code, ) return 0 remaining_quota = 0 # Extract the hourly, daily and monthly remaining quotas for result in json_response: # Only take the relevant results (usageData for 'api' type) if ( "subscriptionType" in result and result["subscriptionType"] == "api" and "usageData" in result ): # Get the monthly quota (limit) entitlement = get_value("usageData.entitlement", result, 0) remaining_quota += int(entitlement) # Get the usage array (per cycle) usage = get_value("usageData.usage", result, []) # Find the current cycle and remove the current usage from that cycle from the remaining quota for usage_cycle in usage: cycle = get_value("cycle", usage_cycle, 0) if cycle == datetime.now().strftime("%Y-%m"): current_usage = get_value("usage", usage_cycle, 0) remaining_quota -= int(current_usage) self.logger.debug("Remaining quota (monthly): %d", remaining_quota) return remaining_quota def get_ibm_xforce_file_results(self, file_hash): """Queries VT API with file hash and returns the results or None if error / nothing found""" url = f"https://api.xforce.ibmcloud.com/malware/{file_hash}" headers = {"Authorization": self.basic_auth} # Get the quotas, if response code != 200, return 0 so we don't query further response = requests.get(url, headers=headers) if response.status_code == 200: # Hash found json_response = response.json() elif response.status_code == 404: # Hash not found json_response = None else: # Unexpected result self.logger.warning( "Error retrieving IBM X-Force File hash results (HTTP Status code: %d): %s", response.status_code, response.text, ) # json_response = response.text json_response = None return json_response def test(self, hash_list): """run the query and build the report (results)""" self.logger.debug("Checking IOCs on IBM X-Force: %s", hash_list) # Get the remaining quota for this run remaining_quota = self.get_remaining_quota() ibm_results = {} # Query VT API for file hashes count = 0 for md5 in hash_list: if count < remaining_quota: # Within quota, let's check the file hash with VT ibm_result = self.get_ibm_xforce_file_results(md5) if ibm_result is not None: if isinstance(ibm_result, type({})) and "malware" in ibm_result: # Get first submission date first_submitted_date = get_value( "malware.created", ibm_results, None ) # Found and marked as malware ibm_results[md5] = { "record": ibm_result, "result": "newAlarm", "first_submitted": first_submitted_date, } else: ibm_results[md5] = {"result": "clean"} else: # 404 not found ibm_results[md5] = {"result": "clean"} else: # Quota reached, skip the check ibm_results[md5] = {"result": "skipped, quota reached"} count += 1 return ibm_results