in Solutions/VMware SD-WAN and SASE/Data Connectors/Function App Connector/vmw_sdwan_sase_funcapp/sdwan_efslogs/__init__.py [0:0]
def EventCompiler(j_rawevent, event_type, metadata={}):
# event_type is a new variable so that we can do two things:
# 1. the def knows how to process data
# 2. it keeps the changes required for a new event type relatively compact
# metadata is a dynamic variable that can be anything in JSON format. Combined with the event type this can be used to pass additional data
# EFS logs
# This is what we will do:
# 1. Process response - skip upload for empty responses
# 2. Deal with pagination, chances are that we will not have 100+ IOC events in a 10 min interval, but you never know.
# 3. Gauge current delay:
# - Search back to past 4 hours, or first 2000 events.
# - Calculate delay (min, max, avg) in msecs
# - Write delay into storage account
j_processed_events =[]
if event_type == "efs_fwlog":
j_multipage_processed = []
if j_rawevent["count"] == 0:
logging.warning("FUNCTION-EVENTCOMPILER: Search API call resulted in empty response, nothing to report...")
j_processed_events = [{
"events": "None",
"metadata": {
"loganalytics_status": "000",
"loganalytics_dce": "N/A",
"loganalytics_table": "N/A"
}
}]
j_delay = veco_fwlog_delayadjust()
if j_delay["metadata"]["veco_searchapi_result"] == "000":
logging.warning("FUNCTION-EVENTCOMPILER: Delay measurement failed.")
return j_processed_events
else:
mp_from = 100
mp_size = 100
while True:
# This is the array that will contain the final formatting
j_array_processing = []
# Event counter for statistics
events = 0
# Extract events from the response
for log_item in j_rawevent["data"]:
j_log_event = {
"sessionId": log_item["_source"]["sessionId"],
"firewallPolicyName": log_item["_source"]["firewallPolicyName"],
"segmentLogicalId": log_item["_source"]["segmentLogicalId"],
"domainName": log_item["_source"]["domainName"],
"category": log_item["_source"]["category"],
"attackTarget": log_item["_source"]["attackTarget"],
"severity": log_item["_source"]["severity"],
"destination": log_item["_source"]["destination"],
"signature": log_item["_source"]["signature"],
"edgeLogicalId": log_item["_source"]["edgeLogicalId"],
"closeReason": log_item["_source"]["closeReason"],
"ipsAlert": log_item["_source"]["ipsAlert"],
"@timestamp": log_item["_source"]["@timestamp"],
"bytesReceived": log_item["_source"]["bytesReceived"],
"signatureId": log_item["_source"]["signatureId"],
"@version": log_item["_source"]["@version"],
"destinationIp": log_item["_source"]["destinationIp"],
"sourcePort": log_item["_source"]["sourcePort"],
"application": log_item["_source"]["application"],
"bytesSent": log_item["_source"]["bytesSent"],
"destinationPort": log_item["_source"]["destinationPort"],
"idsAlert": log_item["_source"]["idsAlert"],
"enterpriseLogicalId": log_item["_source"]["enterpriseLogicalId"],
"inputInterface": log_item["_source"]["inputInterface"],
"segmentName": log_item["_source"]["segmentName"],
"protocol": log_item["_source"]["protocol"],
"actionTaken": log_item["_source"]["actionTaken"],
"verdict": log_item["_source"]["verdict"],
"edgeName": log_item["_source"]["edgeName"],
"ruleId": log_item["_source"]["ruleId"],
"extensionHeader": log_item["_source"]["extensionHeader"],
"logType": log_item["_source"]["logType"],
"ruleVersion": log_item["_source"]["ruleVersion"],
"attackSource": log_item["_source"]["attackSource"],
"timestamp": log_item["_source"]["timestamp"],
"sessionDurationSecs": log_item["_source"]["sessionDurationSecs"],
"sourceIp": log_item["_source"]["sourceIp"]
}
# Deal with duplicate events
# An event should be considered duplicate of an event in the array if:
# 1. The session ID is matching
# AND
# 2. The signature ID is matching
# AND
# 3. The timestamp is matching
state_event = {
"timestamp": j_log_event["timestamp"],
"sessionId": j_log_event["sessionId"],
"signatureId": j_log_event["signatureId"]
}
if g_state["services"]["efs"]["idps_events"] == []:
# State is missing the last event upload
logging.warning("No IDPS Event found in state config, assuming this is the first event...")
g_state["services"]["efs"]["idps_events"].append(state_event)
logging.warning("State file contents: " + json.dumps(g_state["services"]["efs"]))
j_array_processing.append(j_log_event)
else:
# Check if the current event is a duplicate of an already processed event
new_event = True
for j_eventsummary in g_state["services"]["efs"]["idps_events"]:
logging.warning("=== Iteration ===")
logging.warning("State file data: " + j_eventsummary["timestamp"] + " " + str(j_eventsummary["sessionId"]) + " " + str(j_eventsummary["signatureId"]))
logging.warning("API data: " + j_log_event["timestamp"] + " " + str(j_log_event["sessionId"]) + " " + str(j_log_event["signatureId"]))
logging.warning("=== Iteration ===")
if ((j_eventsummary["timestamp"] == j_log_event["timestamp"]) and (j_eventsummary["sessionId"] == j_log_event["sessionId"]) and (j_eventsummary["signatureId"] == j_log_event["signatureId"])):
logging.warning("Duplicate Event Detected.")
new_event = False
if new_event is True:
logging.warning("New event found, adding to the state file.")
logging.warning("State file data: " + j_eventsummary["timestamp"] + " " + str(j_eventsummary["sessionId"]) + " " + str(j_eventsummary["signatureId"]))
logging.warning("API data: " + j_log_event["timestamp"] + " " + str(j_log_event["sessionId"]) + " " + str(j_log_event["signatureId"]))
g_state["services"]["efs"]["idps_events"].append(state_event)
j_array_processing.append(j_log_event)
events = events + 1
logging.info("FUNCTION-EVENTCOMPILER: Extracted " + str(events) + " events, sending it over for processing")
j_processed_events=callLogAnalyticsAPI(j_array_processing, j_config_list["logingestion_api"]["dce"], j_config_list["logingestion_api"]["sdwan"]["efs"]["imi"], j_config_list["logingestion_api"]["sdwan"]["efs"]["stream"])
logging.info("FUNCTION-EVENTCOMPILER: Searching for multi-page response...")
multi_page_result=False
if j_rawevent["metaData"]["more"] != False:
# Multi-page response found
# Run a new API call, and...
logging.info("FUNCTION-EVENTCOMPILER: API Metadata indicates multipage answer, iterating through additional entries")
# FIXME: need to play with "from" and "size" based on the API response "count" to deal with multi-page, just break for now.
query_start = str(metadata["event_compiler"]["query_start"])
query_end = str(metadata["event_compiler"]["query_end"])
nextpage_params = "/edgeFirewall?from=" + str(mp_from) + "&size=" + mp_size + "&startTime=" + str(query_start) + "&endTime=" + str(query_end) + "Z&IDS_ALERT={\"IS\":1}"
header = {
"Authorization": j_config_list["token"]
}
query = craftAPIurl(j_config_list["host"], "/api/cws/v1/enterprises/", j_config_list["token"], True, nextpage_params)
logging.info("FUNCTION-EVENTCOMPILER: API call to: " + query)
nextpage_response = requests.get(url=query, headers=header)
if nextpage_response.status_code != 200:
# If the API call fails, skip next steps
logging.error("FUNCTION-EVENTCOMPILER: Unexpected error when sending API call")
break
else:
# If the API call succeeds, do two things:
# 1. Add events we have sent to the event processing from this page to a larger reporting array
j_multipage_processed.append(j_processed_events)
# 2. Reset input event list to new page, start processing again
logging.info("FUNCTION-EVENTCOMPILER: Next page of the results loaded, starting processing...")
multi_page_result = True
j_rawevent = nextpage_response.json()
mp_from = mp_from + 100
else:
# This clause is single paged, update main def and quit
if multi_page_result == False:
logging.info("FUNCTION-EVENTCOMPILER: Single-page response, processing complete.")
break
else:
logging.info("FUNCTION-EVENTCOMPILER: Last page reached, stopping the recursive processing.")
j_multipage_processed.append(j_processed_events)
j_processed_events = j_multipage_processed
break
statefile = ShareFileClient.from_connection_string(conn_str=os.environ["azsa_share_connectionstring"], share_name=os.environ["azsa_share_name"], file_path="function_state/state.json")
statefile.upload_file(data=json.dumps(g_state))
statefile.close()
j_delay = veco_fwlog_delayadjust()
if j_delay["metadata"]["veco_searchapi_result"] == "000":
logging.warning("FUNCTION-EVENTCOMPILER: Delay measurement failed.")
return j_processed_events