elkserver/docker/redelk-base/redelkinstalldata/scripts/modules/alarm_filehash/ioc_hybridanalysis.py (97 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Part of RedELK This check queries Hybrid Analysis API given a list of md5 hashes. Authors: - Outflank B.V. / Mark Bergman (@xychix) - Lorenzo Bernardi (@fastlorenzo) """ import logging from datetime import datetime import json from dateutil import parser import requests from modules.helpers import get_value from modules.helpers import is_json # The Public API is limited to 2000 requests per hour and a rate of 200 requests per minute. class HA: """This check queries Hybrid Analysis API given a list of md5 hashes.""" def __init__(self, api_key): self.report = {} self.report["source"] = "Hybrid Analysis" self.logger = logging.getLogger("ioc_hybridanalysis") self.api_key = api_key def get_remaining_quota(self): """Returns the number of hashes that could be queried within this run""" url = "https://www.hybrid-analysis.com/api/v2/key/current" headers = { "Accept": "application/json", "User-Agent": "RedELK", "api-key": self.api_key, } # Get the quotas, if response code != 200, return 0 so we don't query further response = requests.get(url, headers=headers) if response.status_code != 200: self.logger.warning( "Error retrieving Hybrid Analysis Quota (HTTP Status code: %d)", response.status_code, ) return 0 api_limits_json = response.headers.get("api-limits") api_limits = json.loads(api_limits_json) # First check if the limit has been reached limit_reached = get_value("limit_reached", api_limits, False) if limit_reached: return 0 # Extract the limits and usage limits_minute = get_value("limits.minute", api_limits, 0) limits_hour = get_value("limits.hour", api_limits, 0) used_minute = get_value("used.minute", api_limits, 0) used_hour = get_value("used.hour", api_limits, 0) remaining_minute = limits_minute - used_minute remaining_hour = limits_hour - used_hour self.logger.debug( "Remaining quotas: hour(%d) / minute(%d)", remaining_hour, remaining_minute ) # Return the remaining quota per minute return remaining_minute def get_ha_file_results(self, filehash): """Queries Hybrid Analysis API with file hash and returns the results or None if error / nothing found""" url = "https://www.hybrid-analysis.com/api/v2/search/hash" headers = { "Accept": "application/json", "api-key": self.api_key, "User-Agent": "RedELK", "Content-Type": "application/x-www-form-urlencoded", } payload = f"hash={filehash}" # Search for the file hash response = requests.post(url, headers=headers, data=payload) if response.status_code == 200: # Hash found json_response = response.json() else: # Unexpected result self.logger.warning( "Error retrieving VT File hash results (HTTP Status code: %d): %s", response.status_code, response.text, ) # json_response = response.text json_response = [] # see lione 106 checking for len 0. return json_response def test(self, hash_list): """run the query and build the report (results)""" # Get the remaining quota for this run remaining_quota = self.get_remaining_quota() ha_results = {} # Query HA API for file hashes count = 0 for md5 in hash_list: if count < remaining_quota: # Within quota, let's check the file hash with HA ha_result = self.get_ha_file_results(md5) # No results, let's return it clean if len(ha_result) == 0: ha_results[md5] = {"result": "clean"} elif is_json(ha_result): # Loop through the results to get the first analysis (submission) date first_analysis_time = datetime.utcnow() for result in ha_result: analysis_start_time = get_value( "analysis_start_time", result, None ) if analysis_start_time is not None: analysis_start_time_date = parser.isoparse( analysis_start_time ).replace(tzinfo=None) first_analysis_time = ( first_analysis_time if first_analysis_time < analysis_start_time_date else analysis_start_time_date ) # Found ha_results[md5] = { "record": ha_result, "result": "newAlarm", "first_submitted": first_analysis_time.isoformat(), # TO-DO: loop through the submissions to get the time 'last_seen' } else: # some horrible error # implement logging here continue else: # Quota reached, skip the check ha_results[md5] = {"result": "skipped, quota reached"} count += 1 return ha_results