in DataConnectors/AWS-CloudTrail-AzureFunction/AzFunAWSCloudTrailLogsIngestion/__init__.py [0:0]
def main(mytimer: func.TimerRequest) -> None:
if mytimer.past_due:
logging.info('The timer is past due!')
logging.info('Starting program')
cli = S3Client(aws_access_key_id, aws_secret_acces_key, aws_region_name, aws_s3_bucket)
ts_from, ts_to = cli.get_time_interval()
print("From:{0}".format(ts_from))
print("To:{0}".format(ts_to))
logging.info('Searching files last modified from {} to {}'.format(ts_from, ts_to))
obj_list = cli.get_files_list(ts_from, ts_to)
logging.info('Total number of files is {}. Total size is {} MB'.format(
len(obj_list),
round(sum([x['Size'] for x in obj_list]) / 10**6, 2)
))
failed_sent_events_number = 0
successfull_sent_events_number = 0
eventobjectlist = {'eventTime', 'eventVersion', 'userIdentity', 'eventSource', 'eventName', 'awsRegion', 'sourceIPAddress', 'userAgent', 'errorCode', 'errorMessage', 'requestID', 'eventID', 'eventType', 'apiVersion', 'managementEvent', 'readOnly', 'resources', 'recipientAccountId', 'serviceEventDetails', 'sharedEventID', 'vpcEndpointId', 'eventCategory', 'additionalEventData'}
groupEvents = {}
coreEvents = []
eventSources = []
for obj in obj_list:
log_events = cli.process_obj(obj)
for log in log_events:
logDetails={}
logDetails1={}
logEventSource = log['eventSource'].split('.')[0].replace('-', '')
if (logEventSource == 'ec2'):
for col in eventobjectlist:
if col in log:
logDetails1[col]=log[col]
ec2Header = logEventSource + '_Header'
if ec2Header not in groupEvents:
groupEvents[ec2Header]=[]
eventSources.append(ec2Header)
groupEvents[ec2Header].append(logDetails1)
else:
groupEvents[ec2Header].append(logDetails1)
ec2Request = logEventSource + '_Request'
if ec2Request not in groupEvents:
groupEvents[ec2Request]=[]
eventSources.append(ec2Request)
ec2Events = {}
ec2Events['eventID']=log['eventID']
ec2Events['awsRegion']=log['awsRegion']
ec2Events['requestID']=log['requestID']
ec2Events['eventTime']=log['eventTime']
ec2Events['requestParameters']=log['requestParameters']
groupEvents[ec2Request].append(ec2Events)
else:
ec2Events = {}
ec2Events['eventID']=log['eventID']
ec2Events['awsRegion']=log['awsRegion']
ec2Events['requestID']=log['requestID']
ec2Events['eventTime']=log['eventTime']
ec2Events['requestParameters']=log['requestParameters']
groupEvents[ec2Request].append(ec2Events)
ec2Response=logEventSource + '_Response'
if ec2Response not in groupEvents:
groupEvents[ec2Response]=[]
eventSources.append(ec2Response)
ec2Events = {}
ec2Events['eventID']=log['eventID']
ec2Events['awsRegion']=log['awsRegion']
ec2Events['requestID']=log['requestID']
ec2Events['eventTime']=log['eventTime']
ec2Events['responseElements']=log['responseElements']
groupEvents[ec2Response].append(ec2Events)
else:
ec2Events = {}
ec2Events['eventID']=log['eventID']
ec2Events['awsRegion']=log['awsRegion']
ec2Events['requestID']=log['requestID']
ec2Events['eventTime']=log['eventTime']
ec2Events['responseElements']=log['responseElements']
groupEvents[ec2Response].append(ec2Events)
else:
if logEventSource not in groupEvents:
groupEvents[logEventSource]=[]
eventSources.append(logEventSource)
groupEvents[logEventSource].append(log)
else:
groupEvents[logEventSource].append(log)
for col in eventobjectlist:
if col in log:
logDetails[col]=log[col]
coreEvents.append(logDetails)
if (isCoreFieldsAllTable == "true" and isSplitAWSResourceTypes == "true"):
file_events = 0
t0 = time.time()
for event in coreEvents:
sentinel = AzureSentinelConnector(logAnalyticsUri, sentinel_customer_id, sentinel_shared_key, sentinel_log_type + '_ALL' , queue_size=10000, bulks_number=10)
with sentinel:
sentinel.send(event)
file_events += 1
failed_sent_events_number += sentinel.failed_sent_events_number
successfull_sent_events_number += sentinel.successfull_sent_events_number
for resource_type in eventSources:
resource_type_events_collection = groupEvents[resource_type]
for resource_type_event in resource_type_events_collection:
sentinel = AzureSentinelConnector(logAnalyticsUri, sentinel_customer_id, sentinel_shared_key, sentinel_log_type + '_' + resource_type, queue_size=10000, bulks_number=10)
with sentinel:
sentinel.send(resource_type_event)
elif (isCoreFieldsAllTable == "true" and isSplitAWSResourceTypes == "false"):
file_events = 0
t0 = time.time()
for event in coreEvents:
sentinel = AzureSentinelConnector(logAnalyticsUri, sentinel_customer_id, sentinel_shared_key, sentinel_log_type + '_ALL', queue_size=10000, bulks_number=10)
with sentinel:
sentinel.send(event)
file_events += 1
failed_sent_events_number += sentinel.failed_sent_events_number
successfull_sent_events_number += sentinel.successfull_sent_events_number
elif (isCoreFieldsAllTable == "false" and isSplitAWSResourceTypes == "true"):
file_events = 0
t0 = time.time()
for resource_type in eventSources:
resource_type_events_collection = groupEvents[resource_type]
for resource_type_event in resource_type_events_collection:
sentinel = AzureSentinelConnector(logAnalyticsUri, sentinel_customer_id, sentinel_shared_key, sentinel_log_type + '_' + resource_type, queue_size=10000, bulks_number=10)
with sentinel:
sentinel.send(resource_type_event)
file_events += 1
failed_sent_events_number += sentinel.failed_sent_events_number
successfull_sent_events_number += sentinel.successfull_sent_events_number
if failed_sent_events_number:
logging.info('{} events have not been sent'.format(failed_sent_events_number))
if successfull_sent_events_number:
logging.info('Program finished. {} events have been sent.'.format(successfull_sent_events_number))
if successfull_sent_events_number == 0 and failed_sent_events_number == 0:
logging.info('No Fresh CloudTrail Events')