in source/log_parser/log-parser.py [0:0]
def execute_athena_query(log, log_type, event):
log.debug('[execute_athena_query] Start')
athena_client = create_client('athena')
s3_output = "s3://%s/athena_results/" % event['accessLogBucket']
database_name = event['glueAccessLogsDatabase']
# Dynamically build query string using partition
# for CloudFront or ALB logs
if log_type == 'CLOUDFRONT' or log_type == 'ALB':
query_string = build_athena_query_for_app_access_logs(
log,
log_type,
event['glueAccessLogsDatabase'],
event['glueAppAccessLogsTable'],
datetime.datetime.utcnow(),
int(environ['WAF_BLOCK_PERIOD']),
int(environ['ERROR_THRESHOLD'])
)
else: # Dynamically build query string using partition for WAF logs
query_string = build_athena_query_for_waf_logs(
log,
event['glueAccessLogsDatabase'],
event['glueWafAccessLogsTable'],
datetime.datetime.utcnow(),
int(environ['WAF_BLOCK_PERIOD']),
int(environ['REQUEST_THRESHOLD'])
)
response = athena_client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database_name},
ResultConfiguration={
'OutputLocation': s3_output,
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'
}
},
WorkGroup=event['athenaWorkGroup']
)
log.info("[execute_athena_query] Query Execution Response: {}".format(response))
log.info('[execute_athena_query] End')