integ/validate_cloudwatch/validator.py (115 lines of code) (raw):
import boto3
import json
import sys
import os
import time
from datetime import datetime, timedelta
client = boto3.client('logs', region_name=os.environ.get('AWS_REGION'))
metrics_client = boto3.client("cloudwatch", region_name=os.environ["AWS_REGION"])
# time range for EMF metric query
start_time = datetime.utcnow() - timedelta(seconds=1200)
end_time = datetime.utcnow() + timedelta(seconds=30)
LOG_GROUP_NAME = os.environ.get('LOG_GROUP_NAME')
def execute_with_retry(max_retry_attempts, retriable_function, *argv):
retry_time_secs = 10
attempt = 0
while attempt < max_retry_attempts:
success, ret_message = retriable_function(*argv)
# If we succeed, then return the success response.
if success:
return True
# If we fail, then increment the attempt and sleep for the specified time.
print(ret_message +
'. Current retry attempt: ' + str(attempt) +
'. Max retry attempt: ' + str(max_retry_attempts))
attempt += 1
time.sleep(retry_time_secs)
sys.exit(retriable_function.__name__ + ' failed after exhaustion of retry limit.')
def validate_test_case(test_name, log_group, log_stream, validator_func):
print('RUNNING: ' + test_name)
try:
response = client.get_log_events(logGroupName=log_group, logStreamName=log_stream)
except Exception as e:
print(e)
return False, 'TEST_FAILURE: API call failed'
# test length
if len(response['events']) != 1000:
print(str(len(response['events'])) + ' events found in CloudWatch')
return False, 'TEST_FAILURE: incorrect number of log events found'
counter = 0
for log in response['events']:
success, ret_message = validator_func(counter, log)
if not success:
return False, ret_message
counter += 1
print('SUCCESS: ' + test_name)
return True, 'Success'
def vanilla_validator(counter, log):
event = json.loads(log['message'])
val = int(event['log'])
if val != counter:
print('Expected: ' + str(counter) + '; Found: ' + str(val))
return False, 'TEST_FAILURE: found out of order log message'
return True, 'Success'
def log_key_validator(counter, log):
# TODO: .strip could be unneeded in the future: https://github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/issues/14
val = int(log['message'].strip('\"'))
if val != counter:
print('Expected: ' + str(counter) + '; Found: ' + str(val))
return False, 'TEST_FAILURE: found out of order log message'
return True, 'Success'
def validate_metric(test_name, metric_namespace, dim_key, dim_value, expected_samples=1):
print('RUNNING: ' + test_name)
if metric_exists(metric_namespace, dim_key, dim_value, expected_samples):
print('SUCCESS: ' + test_name)
return True, 'Success'
return False, 'TEST_FAILURE: failed to validate metric existence in CloudWatch'
def metric_exists(metric_namespace, dim_key, dim_value, expected_samples):
metric_name = get_expected_metric_name()
try:
response = metrics_client.get_metric_statistics(
Namespace=metric_namespace,
MetricName=metric_name,
Dimensions=[{"Name": dim_key, "Value": dim_value}],
StartTime=start_time,
EndTime=end_time,
Period=60,
Statistics=["SampleCount", "Average"],
Unit="None",
)
except Exception as e:
print(e)
return False
if response is None:
return False
total_samples = 0
for datapoint in response["Datapoints"]:
total_samples += datapoint["SampleCount"]
if total_samples == expected_samples:
return True
elif total_samples > expected_samples:
print(f"Too many datapoints returned. Expected {expected_samples}, received {total_samples}")
else:
print(response["Datapoints"])
print(f"Did not find {metric_namespace}/{metric_name}/{dim_key}:{dim_value}")
return False
def get_expected_metric_name():
with open(os.environ.get('EMF_METRIC_NAME_PATH'), 'r') as file:
return file.read().replace('\n', '')
tag = os.environ.get('TAG')
print('Tag for current run is: ' + tag)
# CW Test Case 1: Simple/Basic Configuration, Log message is JSON
success_case_1 = execute_with_retry(5,
validate_test_case,
'CW Test 1: Basic Config',
LOG_GROUP_NAME,
'from-fluent-bit-basic-test-' + tag,
vanilla_validator)
# CW Test Case 2: tests 'log_key' option, Log message is just the stdout output (a number)
success_case_2 = execute_with_retry(5,
validate_test_case,
'CW Test 2: log_key option',
LOG_GROUP_NAME,
'from-fluent-bit-log-key-test-' + tag,
log_key_validator)
success_case_emf = execute_with_retry(25,
validate_metric,
'CW Test 3: EMF metrics',
'fluent-metrics',
'dimensionKey',
'dimensionValue')
if success_case_1 and success_case_2 and success_case_emf:
# if this file is still present, integ script will mark the test as a failure
os.remove("/out/cloudwatch-test")