in Solutions/VMware SD-WAN and SASE/Data Connectors/Function App Connector/vmw_sdwan_sase_funcapp/sdwan_efslogs/__init__.py [0:0]
def initialize(event_type=""):
# This script is to pull in relevant settings and verify if everything is given to run the API calls.
# Pull in environmental variables
# VECO settings
host = os.environ["api_veco_fqdn"]
token = os.environ["api_veco_authorization"]
# Log Analytics settings for writing data
dce = os.environ["dce_endpoint"]
dcr_cwshealth_immutableid = os.environ["dcr_cwshealth_immutableid"]
dcr_cwsweblog_immutableid = os.environ["dcr_cwsweblog_immutableid"]
dcr_cwsdlplog_immutableid = os.environ["dcr_cwsdlplog_immutableid"]
dcr_efsfwlog_immutableid = os.environ["dcr_efsfwlog_immutableid"]
dcr_efshealth_immutableid = os.environ["dcr_efshealth_immutableid"]
# Audit - future use
dcr_auditlog_immutableid = os.environ["dcr_saseaudit_immutableid"]
cwshealth_stream = os.environ["stream_cwshealth"]
weblog_stream = os.environ["stream_cwsweblog"]
dlplog_stream = os.environ["stream_cwsdlplog"]
efsfwlog_stream = os.environ["stream_efsfwlog"]
efshealth_stream = os.environ["stream_efshealth"]
# Audit - future use
auditlog_stream = os.environ["stream_saseaudit"]
#Function App frequency in mins
frequency = os.environ["app_frequency_mins"]
frequency_sec = int(frequency) * 60
frequency_msec = frequency_sec * 1000
# Check for pre-existing state conditions
# Script state --> Contains state information from previous script runs
global g_state
g_state = {}
g_state = {
"services": {
"efs": {
"delay_value": 0,
"delay_unit": "msec",
"update_timestamp": "",
# This is to keep track of the last IDPS events collected by the script
"idps_events": []
}
}
}
# Storage Account connectivity
# Extra steps for EFS:
# FW logs have delays beyond 5-10 mins, and if this happens, we need to deal with it to avoid event loss.
# 1. Check if the state config exists in the storage account
# 2a. If not, and type is efs, add JSON entry that sets the delay to 0
# 2b. If file exists, read delay entry so that we can adjust queries
efs_delay = 0
if event_type == "efs":
logging.warning("FUNCTION-INIT: Verifying state library presence...")
statedir = ShareDirectoryClient.from_connection_string(conn_str=os.environ["azsa_share_connectionstring"], share_name=os.environ["azsa_share_name"], directory_path="function_state")
if statedir.exists():
logging.info("FUNCTION-INIT: State Directory found, skip directory creation...")
logging.warning("FUNCTION-INIT: Searching for existing state conditions...")
filelist = list(statedir.list_directories_and_files())
statedir.close()
statefile = ShareFileClient.from_connection_string(conn_str=os.environ["azsa_share_connectionstring"], share_name=os.environ["azsa_share_name"], file_path="function_state/state.json")
if filelist == []:
logging.info("FUNCTION-INIT: No state configuration found, the script will assume first run, assuming delay of zero ...")
g_state["services"]["efs"]["delay_value"] = 0
g_state["services"]["efs"]["update_timestamp"] = str(datetime.datetime.utcnow())
statefile.upload_file(data=json.dumps(g_state))
else:
raw_state = statefile.download_file()
j_state = json.loads(raw_state.readall())
# Now that we have the json read in a var, we can zeroize the event list in the file again
g_state["services"]["efs"]["idps_events"] = []
statefile.upload_file(data=json.dumps(g_state))
statefile.close()
if not j_state["services"]["efs"]["delay_value"] is None:
efs_delay = j_state["services"]["efs"]["delay_value"]
if not j_state["services"]["efs"]["idps_events"] == []:
for j_state_event in j_state["services"]["efs"]["idps_events"]:
logging.info("Found last logged event with the following details: " + json.dumps(j_state_event))
g_state["services"]["efs"]["idps_events"].append(j_state_event)
logging.info("FUNCTION-INIT: EFS delay found in state file, current delay observed is " + str(efs_delay) + str(j_state["services"]["efs"]["delay_unit"]))
else:
logging.info("FUNCTION-INIT: State Directory is not found, creating new one...")
statedir.create_directory()
statedir.close()
logging.info("FUNCTION-INIT: Writing initial state configuration, assuming delay of zero ...")
statefile = ShareFileClient.from_connection_string(conn_str=os.environ["azsa_share_connectionstring"], share_name="azsa_share_name", file_path="function_state/state.json")
g_state["services"]["efs"]["delay_value"] = 0
g_state["services"]["efs"]["update_timestamp"] = str(datetime.datetime.utcnow())
statefile.upload_file(data=json.dumps(g_state))
statefile.close()
#validate that none of the settings are empty and add them to a JSON list
if not [x for x in (host, token, dce, dcr_cwshealth_immutableid, dcr_cwsweblog_immutableid, weblog_stream, cwshealth_stream, dcr_cwsdlplog_immutableid, dlplog_stream, efsfwlog_stream, dcr_efsfwlog_immutableid) if x is None]:
global j_config_list
j_config_list = {
"host": host,
"token": token,
"logingestion_api": {
"dce": dce,
"streams": {
"cws_health": cwshealth_stream,
"cws_health_imi": dcr_cwshealth_immutableid,
"cws_weblog": weblog_stream,
"cws_weblog_imi": dcr_cwsweblog_immutableid,
"cws_dlplog": dlplog_stream,
"cws_dlplog_imi": dcr_cwsdlplog_immutableid,
"efs_fwlog": efsfwlog_stream,
"efs_fwlog_imi": dcr_efsfwlog_immutableid
},
"cws": {
"health": {
"stream": cwshealth_stream,
"imi": dcr_cwshealth_immutableid
},
"web": {
"stream": weblog_stream,
"imi": dcr_cwsweblog_immutableid
},
"dlp": {
"stream": dlplog_stream,
"imi": dcr_cwsdlplog_immutableid
}
},
"sdwan": {
"efs": {
"stream": efsfwlog_stream,
"imi": dcr_efsfwlog_immutableid,
"delay": efs_delay
},
"efs_health": {
"stream": efshealth_stream,
"imi": dcr_efshealth_immutableid
},
"audit": {
"stream": auditlog_stream,
"imi": dcr_auditlog_immutableid
}
}
},
"frequency_sec": frequency_sec,
"frequency_msec": frequency_msec
}
logging.info("FUNCTION-INIT: All variables set, initialization complete")
return j_config_list
else:
logging.error("FUNCTION-INIT: Missing parameter, function stopped. Please check the Application settings tab in your Function App configuration")
j_config_list = {"error": "missing app settings parameter"}
return j_config_list