util/awsservice/cloudwatchlogs.go (304 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package awsservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/qri-io/jsonschema"
)
const (
logStreamRetry = 20
retryInterval = 10 * time.Second
NoLogTypeFound = "NoLogTypeFound"
)
// catch ResourceNotFoundException when deleting the log group and log stream, as these
// are not useful exceptions to log errors on during cleanup
var rnf *types.ResourceNotFoundException
// DeleteLogGroupAndStream cleans up a log group and stream by name. This gracefully handles
// ResourceNotFoundException errors from calling the APIs
func DeleteLogGroupAndStream(logGroupName, logStreamName string) {
DeleteLogStream(logGroupName, logStreamName)
DeleteLogGroup(logGroupName)
}
// DeleteLogStream cleans up log stream by name
func DeleteLogStream(logGroupName, logStreamName string) {
_, err := CwlClient.DeleteLogStream(ctx, &cloudwatchlogs.DeleteLogStreamInput{
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(logStreamName),
})
if err != nil && !errors.As(err, &rnf) {
log.Printf("Error occurred while deleting log stream %s: %v", logStreamName, err)
}
}
// DeleteLogGroup cleans up log group by name
func DeleteLogGroup(logGroupName string) {
_, err := CwlClient.DeleteLogGroup(ctx, &cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String(logGroupName),
})
if err != nil && !errors.As(err, &rnf) {
log.Printf("Error occurred while deleting log group %s: %v", logGroupName, err)
}
}
// ValidateLogs queries a given LogGroup/LogStream combination given the start and end times, and executes an
// arbitrary validator function on the found logs.
func ValidateLogs(logGroup, logStream string, since, until *time.Time, validators ...LogEventsValidator) error {
log.Printf("Checking %s/%s", logGroup, logStream)
events, err := GetLogsSince(logGroup, logStream, since, until)
if err != nil {
return err
}
for _, validator := range validators {
if err = validator(events); err != nil {
return err
}
}
return nil
}
// GetLogsSince makes GetLogEvents API calls, paginates through the results for the given time frame, and returns
// the raw log strings
func GetLogsSince(logGroup, logStream string, since, until *time.Time) ([]types.OutputLogEvent, error) {
var events []types.OutputLogEvent
// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html
// GetLogEvents can return an empty result while still having more log events on a subsequent page,
// so rather than expecting all the events to show up in one GetLogEvents API call, we need to paginate.
params := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
StartFromHead: aws.Bool(true), // read from the beginning
}
if since != nil {
params.StartTime = aws.Int64(since.UnixMilli())
}
if until != nil {
params.EndTime = aws.Int64(until.UnixMilli())
}
var nextToken *string
attempts := 0
for {
if nextToken != nil {
params.NextToken = nextToken
}
output, err := CwlClient.GetLogEvents(ctx, params)
attempts += 1
if err != nil {
if errors.As(err, &rnf) && attempts <= StandardRetries {
// The log group/stream hasn't been created yet, so wait and retry
time.Sleep(30 * time.Second)
continue
}
// if the error is not a ResourceNotFoundException, we should fail here.
return events, err
}
for _, e := range output.Events {
events = append(events, e)
}
if nextToken != nil && output.NextForwardToken != nil && *output.NextForwardToken == *nextToken {
// From the docs: If you have reached the end of the stream, it returns the same token you passed in.
log.Printf("Done paginating log events for %s/%s and found %d logs", logGroup, logStream, len(events))
break
}
nextToken = output.NextForwardToken
}
return events, nil
}
// IsLogGroupExists confirms whether the logGroupName exists or not
func IsLogGroupExists(logGroupName string, logGroupClassArg ...types.LogGroupClass) bool {
var logGroupClass types.LogGroupClass
if len(logGroupClassArg) > 0 {
logGroupClass = logGroupClassArg[0]
} else {
logGroupClass = types.LogGroupClassStandard
}
describeLogGroupInput := cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(logGroupName),
LogGroupClass: logGroupClass,
}
describeLogGroupOutput, err := CwlClient.DescribeLogGroups(ctx, &describeLogGroupInput)
if err != nil {
log.Println("error occurred while calling DescribeLogGroups", err)
return false
}
return len(describeLogGroupOutput.LogGroups) > 0
}
// GetLogQueryStats for the log group between start/end (in epoch seconds) for the
// query string.
func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString string) (*types.QueryStatistics, error) {
output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{
LogGroupName: aws.String(logGroupName),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime),
QueryString: aws.String(queryString),
})
if err != nil {
return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err)
}
// Sleep a fixed amount of time after making the query to give it time to
// process the request.
time.Sleep(retryInterval)
var attempts int
for {
results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{
QueryId: output.QueryId,
})
if err != nil {
return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err)
}
switch results.Status {
case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown:
if attempts >= StandardRetries {
return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status)
}
attempts++
time.Sleep(retryInterval)
case types.QueryStatusComplete:
return results.Statistics, nil
default:
return nil, fmt.Errorf("unexpected query status: %v", results.Status)
}
}
}
// GetLogQueryResults for the log group between start/end (in epoch seconds) for the
// query string.
func GetLogQueryResults(logGroupName string, startTime, endTime int64, queryString string) ([][]types.ResultField, error) {
output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{
LogGroupName: aws.String(logGroupName),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime),
QueryString: aws.String(queryString),
})
if err != nil {
return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err)
}
// Sleep a fixed amount of time after making the query to give it time to
// process the request.
time.Sleep(retryInterval)
var attempts int
for {
results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{
QueryId: output.QueryId,
})
if err != nil {
return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err)
}
switch results.Status {
case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown:
if attempts >= StandardRetries {
return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status)
}
attempts++
time.Sleep(retryInterval)
case types.QueryStatusComplete:
return results.Results, nil
default:
return nil, fmt.Errorf("unexpected query status: %v", results.Status)
}
}
}
func GetLogStreams(logGroupName string) []types.LogStream {
for i := 0; i < logStreamRetry; i++ {
describeLogStreamsOutput, err := CwlClient.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(logGroupName),
OrderBy: types.OrderByLastEventTime,
Descending: aws.Bool(true),
Limit: aws.Int32(10),
})
if err != nil {
log.Printf("failed to get log streams for log group: %v - err: %v", logGroupName, err)
continue
}
if len(describeLogStreamsOutput.LogStreams) > 0 {
return describeLogStreamsOutput.LogStreams
}
time.Sleep(retryInterval)
}
return []types.LogStream{}
}
func GetLogStreamNames(logGroupName string) []string {
var logStreamNames []string
for _, stream := range GetLogStreams(logGroupName) {
logStreamNames = append(logStreamNames, *stream.LogStreamName)
}
return logStreamNames
}
type LogEventValidator func(event types.OutputLogEvent) error
type LogEventsValidator func(events []types.OutputLogEvent) error
type SchemaRetriever func(message string) (string, error)
func WithSchema(schema string) SchemaRetriever {
return func(_ string) (string, error) {
return schema, nil
}
}
func AssertLogSchema(schemaRetriever SchemaRetriever) LogEventValidator {
return func(event types.OutputLogEvent) error {
message := *event.Message
if schemaRetriever == nil {
return errors.New("nil schema retriever")
}
schema, err := schemaRetriever(*event.Message)
if err != nil {
return fmt.Errorf("unable to retrieve schema: %w", err)
}
keyErrors, err := jsonschema.Must(schema).ValidateBytes(context.Background(), []byte(message))
if err != nil {
return fmt.Errorf("failed to execute schema validator: %w", err)
} else if len(keyErrors) > 0 {
return fmt.Errorf("failed schema validation: %v | schema: %s | log: %s", keyErrors, schema, message)
}
return nil
}
}
func AssertLogContainsSubstring(substr string) LogEventValidator {
return func(event types.OutputLogEvent) error {
if !strings.Contains(*event.Message, substr) {
return fmt.Errorf("log event message missing substring (%s): %s", substr, *event.Message)
}
return nil
}
}
// AssertPerLog runs each validator on each of the log events. Fails fast.
func AssertPerLog(validators ...LogEventValidator) LogEventsValidator {
return func(events []types.OutputLogEvent) error {
for _, event := range events {
for _, validator := range validators {
if err := validator(event); err != nil {
return err
}
}
}
return nil
}
}
func AssertLogsNotEmpty() LogEventsValidator {
return func(events []types.OutputLogEvent) error {
if len(events) == 0 {
return errors.New("no log events")
}
return nil
}
}
func AssertLogsCount(count int) LogEventsValidator {
return func(events []types.OutputLogEvent) error {
if len(events) != count {
return fmt.Errorf("actual log events count (%v) does not match expected (%v)", len(events), count)
}
return nil
}
}
func AssertNoDuplicateLogs() LogEventsValidator {
return func(events []types.OutputLogEvent) error {
byTimestamp := make(map[int64]map[string]struct{})
for _, event := range events {
message := *event.Message
timestamp := *event.Timestamp
messages, ok := byTimestamp[timestamp]
if !ok {
messages = map[string]struct{}{}
byTimestamp[timestamp] = messages
}
_, ok = messages[message]
if ok {
return fmt.Errorf("duplicate message found at %v | message: %s", time.UnixMilli(timestamp), message)
}
messages[message] = struct{}{}
}
return nil
}
}
func GetLogEventCountPerType(logGroup, logStream string, since, until *time.Time) (map[string]int, error) {
var typeFrequency = make(map[string]int)
events, err := GetLogsSince(logGroup, logStream, since, until)
// if there is an error, return the empty map
if err != nil {
return typeFrequency, err
}
typeFrequency[NoLogTypeFound] = 0
for _, event := range events {
message := *event.Message
var eksClusterType EKSClusterType
innerErr := json.Unmarshal([]byte(message), &eksClusterType)
if innerErr != nil {
typeFrequency[NoLogTypeFound]++
}
typeFrequency[eksClusterType.Type]++
}
return typeFrequency, nil
}