handlers/aws/cloudwatch_logs_trigger.py (120 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. import datetime from typing import Any, Iterator, Optional from botocore.client import BaseClient as BotoBaseClient from share import ExpandEventListFromField, ProtocolMultiline, json_parser, shared_logger from storage import ProtocolStorage, StorageFactory from .utils import get_account_id_from_arn def _from_awslogs_data_to_event(awslogs_data: str) -> Any: """ Returns cloudwatch logs event from base64 encoded and gzipped payload """ storage: ProtocolStorage = StorageFactory.create(storage_type="payload", payload=awslogs_data) cloudwatch_logs_payload_plain = storage.get_as_string() return json_parser(cloudwatch_logs_payload_plain) def _handle_cloudwatch_logs_move( sqs_client: BotoBaseClient, sqs_destination_queue: str, cloudwatch_logs_event: dict[str, Any], input_id: str, config_yaml: str, continuing_queue: bool = True, current_log_event: int = 0, last_ending_offset: Optional[int] = None, last_event_expanded_offset: Optional[int] = None, ) -> None: """ Handler of the continuation queue for cloudwatch logs inputs If a cloudwatch logs data payload cannot be fully processed before the timeout of the lambda this handler will be called: it will send new sqs messages for the unprocessed payload to the internal continuing sqs queue """ log_group_name = cloudwatch_logs_event["logGroup"] log_stream_name = cloudwatch_logs_event["logStream"] logs_events = cloudwatch_logs_event["logEvents"][current_log_event:] for current_log_event, log_event in enumerate(logs_events): if current_log_event > 0: last_ending_offset = None message_attributes = { "config": {"StringValue": config_yaml, "DataType": "String"}, "originalEventId": {"StringValue": log_event["id"], "DataType": "String"}, "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, "originalLogGroup": {"StringValue": log_group_name, "DataType": "String"}, "originalLogStream": {"StringValue": log_stream_name, "DataType": "String"}, "originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"}, } if last_ending_offset is not None: message_attributes["originalLastEndingOffset"] = { "StringValue": str(last_ending_offset), "DataType": "Number", } if last_event_expanded_offset is not None: message_attributes["originalLastEventExpandedOffset"] = { "StringValue": str(last_event_expanded_offset), "DataType": "Number", } sqs_client.send_message( QueueUrl=sqs_destination_queue, MessageBody=log_event["message"], MessageAttributes=message_attributes, ) if continuing_queue: shared_logger.debug( "continuing", extra={ "sqs_continuing_queue": sqs_destination_queue, "last_ending_offset": last_ending_offset, "last_event_expanded_offset": last_event_expanded_offset, "event_id": log_event["id"], "event_timestamp": log_event["timestamp"], }, ) else: shared_logger.debug( "replaying", extra={ "sqs_replaying_queue": sqs_destination_queue, "event_id": log_event["id"], "event_timestamp": log_event["timestamp"], }, ) def _handle_cloudwatch_logs_event( event: dict[str, Any], aws_region: str, input_id: str, event_list_from_field_expander: ExpandEventListFromField, json_content_type: Optional[str], multiline_processor: Optional[ProtocolMultiline], ) -> Iterator[tuple[dict[str, Any], int, Optional[int], int]]: """ Handler for cloudwatch logs inputs. It iterates through the logEvents in cloudwatch logs trigger payload and process content of body payload in the log event. If a log event cannot be fully processed before the timeout of the lambda it will call the sqs continuing handler """ account_id = get_account_id_from_arn(input_id) log_group_name = event["logGroup"] log_stream_name = event["logStream"] for cloudwatch_log_event_n, cloudwatch_log_event in enumerate(event["logEvents"]): event_id = cloudwatch_log_event["id"] event_timestamp = cloudwatch_log_event["timestamp"] storage_message: ProtocolStorage = StorageFactory.create( storage_type="payload", payload=cloudwatch_log_event["message"], json_content_type=json_content_type, event_list_from_field_expander=event_list_from_field_expander, multiline_processor=multiline_processor, ) events = storage_message.get_by_lines(range_start=0) for log_event, starting_offset, ending_offset, event_expanded_offset in events: assert isinstance(log_event, bytes) es_event: dict[str, Any] = { "@timestamp": datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "fields": { "message": log_event.decode("utf-8"), "log": { "offset": starting_offset, "file": { "path": f"{log_group_name}/{log_stream_name}", }, }, "aws": { "cloudwatch": { "log_group": log_group_name, "log_stream": log_stream_name, "event_id": event_id, } }, "cloud": { "provider": "aws", "region": aws_region, "account": {"id": account_id}, }, }, "meta": {"event_timestamp": event_timestamp}, } yield es_event, ending_offset, event_expanded_offset, cloudwatch_log_event_n