def lambda_handler()

in handlers/aws/handler.py [0:0]


def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Context) -> Any:
    """
    AWS Lambda handler in handler.aws package
    Parses the config and acts as front controller for inputs
    """
    shared_logger.debug("lambda triggered", extra={"invoked_function_arn": lambda_context.invoked_function_arn})

    try:
        trigger_type, config_source = get_trigger_type_and_config_source(lambda_event)
        shared_logger.info("trigger", extra={"type": trigger_type})
    except Exception as e:
        raise TriggerTypeException(e)

    try:
        if config_source == CONFIG_FROM_PAYLOAD:
            config_yaml = config_yaml_from_payload(lambda_event)
        else:
            config_yaml = config_yaml_from_s3()
    except Exception as e:
        raise ConfigFileException(e)

    if config_yaml == "":
        raise ConfigFileException("Empty config")

    try:
        config = parse_config(config_yaml, _expanders)
    except Exception as e:
        raise ConfigFileException(e)

    assert config is not None

    sqs_client = get_sqs_client()

    if trigger_type == "replay-sqs":
        shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

        replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"]
        replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn)
        shipper_cache: dict[str, CompositeShipper] = {}
        for replay_record in lambda_event["Records"]:
            event = json_parser(replay_record["body"])
            input_id = event["event_input_id"]
            output_destination = event["output_destination"]
            shipper_id = input_id + output_destination

            if shipper_id not in shipper_cache:
                shipper = get_shipper_for_replay_event(
                    config=config,
                    output_destination=output_destination,
                    output_args=event["output_args"],
                    event_input_id=input_id,
                    replay_handler=replay_handler,
                )

                if shipper is None:
                    shared_logger.warning(
                        "no shipper for output in replay queue",
                        extra={
                            "output_destination": event["output_destination"],
                            "event_input_id": event["event_input_id"],
                        },
                    )
                    continue

                shipper_cache[shipper_id] = shipper
            else:
                shipper = shipper_cache[shipper_id]

            assert isinstance(shipper, CompositeShipper)

            shipper.send(event["event_payload"])
            event_uniq_id: str = event["event_payload"]["_id"] + output_destination
            replay_handler.add_event_with_receipt_handle(
                event_uniq_id=event_uniq_id, receipt_handle=replay_record["receiptHandle"]
            )

            if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
                shared_logger.info("lambda is going to shutdown")
                break

        for shipper in shipper_cache.values():
            shipper.flush()

        replay_handler.flush()
        shared_logger.info("lambda replayed all the events")
        return "replayed"

    sent_events: int = 0
    empty_events: int = 0
    skipped_events: int = 0
    error_events: int = 0

    sqs_replaying_queue = os.environ["SQS_REPLAY_URL"]
    sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

    if trigger_type == "cloudwatch-logs":
        cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"])

        shared_logger.info("trigger", extra={"size": len(cloudwatch_logs_event["logEvents"])})

        lambda_region = get_lambda_region()

        input_id, event_input = get_input_from_log_group_subscription_data(
            config,
            cloudwatch_logs_event["owner"],
            cloudwatch_logs_event["logGroup"],
            cloudwatch_logs_event["logStream"],
            # As of today, the cloudwatch trigger is always in
            # the same region as the lambda function.
            lambda_region,
        )

        if event_input is None:
            shared_logger.error("no input defined", extra={"input_id": input_id})
            error_events += 1
            _handle_cloudwatch_logs_move(
                sqs_client=sqs_client,
                sqs_destination_queue=sqs_replaying_queue,
                cloudwatch_logs_event=cloudwatch_logs_event,
                input_id=input_id,
                config_yaml=config_yaml,
                continuing_queue=False,
            )
            shared_logger.info(
                "lambda is going to shutdown",
                extra={
                    "error_events": error_events,
                    "sent_events": sent_events,
                    "empty_events": empty_events,
                    "skipped_events": skipped_events,
                },
            )
            return "completed"

        aws_region = input_id.split(":")[3]
        composite_shipper = get_shipper_from_input(event_input=event_input)

        event_list_from_field_expander = ExpandEventListFromField(
            event_input.expand_event_list_from_field,
            INTEGRATION_SCOPE_GENERIC,
            expand_event_list_from_field_resolver,
            event_input.root_fields_to_add_to_expanded_event,
        )

        for (
            es_event,
            last_ending_offset,
            last_event_expanded_offset,
            current_log_event_n,
        ) in _handle_cloudwatch_logs_event(
            cloudwatch_logs_event,
            aws_region,
            event_input.id,
            event_list_from_field_expander,
            event_input.json_content_type,
            event_input.get_multiline_processor(),
        ):
            sent_outcome = composite_shipper.send(es_event)
            if sent_outcome == EVENT_IS_SENT:
                sent_events += 1
            elif sent_outcome == EVENT_IS_FILTERED:
                skipped_events += 1
            else:
                empty_events += 1

            if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
                shared_logger.info(
                    "lambda is going to shutdown, continuing on dedicated sqs queue",
                    extra={
                        "sqs_queue": sqs_continuing_queue,
                        "sent_events": sent_events,
                        "empty_events": empty_events,
                        "skipped_events": skipped_events,
                    },
                )

                composite_shipper.flush()

                _handle_cloudwatch_logs_move(
                    sqs_client=sqs_client,
                    sqs_destination_queue=sqs_continuing_queue,
                    last_ending_offset=last_ending_offset,
                    last_event_expanded_offset=last_event_expanded_offset,
                    cloudwatch_logs_event=cloudwatch_logs_event,
                    current_log_event=current_log_event_n,
                    input_id=input_id,
                    config_yaml=config_yaml,
                )

                return "continuing"

        composite_shipper.flush()
        shared_logger.info(
            "lambda processed all the events",
            extra={
                "sent_events": sent_events,
                "empty_events": empty_events,
                "skipped_events": skipped_events,
                "error_events": error_events,
            },
        )

    if trigger_type == "kinesis-data-stream":
        shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

        input_id = lambda_event["Records"][0]["eventSourceARN"]
        event_input = config.get_input_by_id(input_id)

        if event_input is None:
            shared_logger.error("no input defined", extra={"input_id": input_id})
            error_events += len(lambda_event["Records"])

            for kinesis_record in lambda_event["Records"]:
                _handle_kinesis_move(
                    sqs_client=sqs_client,
                    sqs_destination_queue=sqs_replaying_queue,
                    kinesis_record=kinesis_record,
                    event_input_id=input_id,
                    config_yaml=config_yaml,
                    continuing_queue=False,
                )

            shared_logger.info(
                "lambda is going to shutdown",
                extra={
                    "sent_events": sent_events,
                    "empty_events": empty_events,
                    "skipped_events": skipped_events,
                    "error_events": error_events,
                },
            )
            return "completed"

        composite_shipper = get_shipper_from_input(event_input=event_input)

        event_list_from_field_expander = ExpandEventListFromField(
            event_input.expand_event_list_from_field,
            INTEGRATION_SCOPE_GENERIC,
            expand_event_list_from_field_resolver,
            event_input.root_fields_to_add_to_expanded_event,
        )

        for (
            es_event,
            last_ending_offset,
            last_event_expanded_offset,
            current_kinesis_record_n,
        ) in _handle_kinesis_record(
            lambda_event,
            event_input.id,
            event_list_from_field_expander,
            event_input.json_content_type,
            event_input.get_multiline_processor(),
        ):
            sent_outcome = composite_shipper.send(es_event)
            if sent_outcome == EVENT_IS_SENT:
                sent_events += 1
            elif sent_outcome == EVENT_IS_FILTERED:
                skipped_events += 1
            else:
                empty_events += 1

            if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
                shared_logger.info(
                    "lambda is going to shutdown, continuing on dedicated sqs queue",
                    extra={
                        "sqs_queue": sqs_continuing_queue,
                        "sent_events": sent_events,
                        "empty_events": empty_events,
                        "skipped_events": skipped_events,
                        "error_events": error_events,
                    },
                )

                composite_shipper.flush()

                remaining_kinesis_records = lambda_event["Records"][current_kinesis_record_n:]
                for current_kinesis_record, kinesis_record in enumerate(remaining_kinesis_records):
                    continuing_last_ending_offset: Optional[int] = last_ending_offset
                    continuing_last_event_expanded_offset: Optional[int] = last_event_expanded_offset
                    if current_kinesis_record > 0:
                        continuing_last_ending_offset = None
                        continuing_last_event_expanded_offset = None

                    _handle_kinesis_move(
                        sqs_client=sqs_client,
                        sqs_destination_queue=sqs_continuing_queue,
                        last_ending_offset=continuing_last_ending_offset,
                        last_event_expanded_offset=continuing_last_event_expanded_offset,
                        kinesis_record=kinesis_record,
                        event_input_id=input_id,
                        config_yaml=config_yaml,
                    )

                return "continuing"

        composite_shipper.flush()
        shared_logger.info(
            "lambda processed all the events",
            extra={
                "sent_events": sent_events,
                "empty_events": empty_events,
                "skipped_events": skipped_events,
                "error_events": error_events,
            },
        )

    if trigger_type == "s3-sqs" or trigger_type == "sqs":
        shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

        composite_shipper_cache: dict[str, CompositeShipper] = {}

        def event_processing(
            processing_composing_shipper: CompositeShipper, processing_es_event: dict[str, Any]
        ) -> tuple[bool, str]:
            processing_sent_outcome = processing_composing_shipper.send(processing_es_event)

            if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
                return True, processing_sent_outcome

            return False, processing_sent_outcome

        def handle_timeout(
            remaining_sqs_records: list[dict[str, Any]],
            timeout_last_ending_offset: Optional[int],
            timeout_last_event_expanded_offset: Optional[int],
            timeout_sent_events: int,
            timeout_empty_events: int,
            timeout_skipped_events: int,
            timeout_config_yaml: str,
            timeout_current_s3_record: int = 0,
        ) -> None:
            shared_logger.info(
                "lambda is going to shutdown, continuing on dedicated sqs queue",
                extra={
                    "sqs_queue": sqs_continuing_queue,
                    "sent_events": timeout_sent_events,
                    "empty_events": timeout_empty_events,
                    "skipped_events": timeout_skipped_events,
                },
            )

            for timeout_current_sqs_record, timeout_sqs_record in enumerate(remaining_sqs_records):
                if timeout_current_sqs_record > 0:
                    timeout_last_ending_offset = None
                    timeout_last_event_expanded_offset = None
                    timeout_current_s3_record = 0

                timeout_input_id = timeout_sqs_record["eventSourceARN"]
                if (
                    "messageAttributes" in timeout_sqs_record
                    and "originalEventSourceARN" in timeout_sqs_record["messageAttributes"]
                ):
                    timeout_input_id = timeout_sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

                timeout_input = config.get_input_by_id(input_id=timeout_input_id)

                if timeout_input is None:
                    continue

                if timeout_input.type == "s3-sqs":
                    _handle_s3_sqs_move(
                        sqs_client=sqs_client,
                        sqs_destination_queue=sqs_continuing_queue,
                        last_ending_offset=timeout_last_ending_offset,
                        last_event_expanded_offset=timeout_last_event_expanded_offset,
                        sqs_record=timeout_sqs_record,
                        current_s3_record=timeout_current_s3_record,
                        input_id=timeout_input_id,
                        config_yaml=timeout_config_yaml,
                    )
                else:
                    handle_sqs_move(
                        sqs_client=sqs_client,
                        sqs_destination_queue=sqs_continuing_queue,
                        last_ending_offset=timeout_last_ending_offset,
                        last_event_expanded_offset=timeout_last_event_expanded_offset,
                        sqs_record=timeout_sqs_record,
                        input_id=timeout_input_id,
                        config_yaml=timeout_config_yaml,
                    )

        previous_sqs_record: int = 0
        for current_sqs_record, sqs_record in enumerate(lambda_event["Records"]):
            if current_sqs_record > previous_sqs_record:
                previous_sqs_record = current_sqs_record

            continuing_original_input_type = get_continuing_original_input_type(sqs_record)

            input_id = sqs_record["eventSourceARN"]
            if "messageAttributes" in sqs_record and "originalEventSourceARN" in sqs_record["messageAttributes"]:
                input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

            event_input = config.get_input_by_id(input_id)

            if event_input is None:
                # This could happen if aws_lambda_event_source_mapping is set correctly, but
                # the id on the config.yaml was writen incorrectly.
                shared_logger.error("no input defined", extra={"input_id": input_id})
                if trigger_type == "s3-sqs":
                    _handle_s3_sqs_move(
                        sqs_client=sqs_client,
                        sqs_destination_queue=sqs_replaying_queue,
                        sqs_record=sqs_record,
                        input_id=input_id,
                        config_yaml=config_yaml,
                        continuing_queue=False,
                    )
                elif trigger_type == "sqs":
                    handle_sqs_move(
                        sqs_client=sqs_client,
                        sqs_destination_queue=sqs_replaying_queue,
                        sqs_record=sqs_record,
                        input_id=input_id,
                        config_yaml=config_yaml,
                        continuing_queue=False,
                    )
                error_events += 1
                continue

            if input_id in composite_shipper_cache:
                composite_shipper = composite_shipper_cache[input_id]
            else:
                composite_shipper = get_shipper_from_input(event_input=event_input)
                composite_shipper_cache[event_input.id] = composite_shipper

            continuing_event_expanded_offset: Optional[int] = None
            if (
                "messageAttributes" in sqs_record
                and "originalLastEventExpandedOffset" in sqs_record["messageAttributes"]
            ):
                continuing_event_expanded_offset = int(
                    sqs_record["messageAttributes"]["originalLastEventExpandedOffset"]["stringValue"]
                )

            if (
                event_input.type == "kinesis-data-stream"
                or event_input.type == "sqs"
                or event_input.type == "cloudwatch-logs"
            ):
                event_list_from_field_expander = ExpandEventListFromField(
                    event_input.expand_event_list_from_field,
                    INTEGRATION_SCOPE_GENERIC,
                    expand_event_list_from_field_resolver,
                    event_input.root_fields_to_add_to_expanded_event,
                    continuing_event_expanded_offset,
                )

                for es_event, last_ending_offset, last_event_expanded_offset in _handle_sqs_event(
                    sqs_record,
                    input_id,
                    event_list_from_field_expander,
                    continuing_original_input_type,
                    event_input.json_content_type,
                    event_input.get_multiline_processor(),
                ):
                    timeout, sent_outcome = event_processing(
                        processing_composing_shipper=composite_shipper, processing_es_event=es_event
                    )

                    if sent_outcome == EVENT_IS_SENT:
                        sent_events += 1
                    elif sent_outcome == EVENT_IS_FILTERED:
                        skipped_events += 1
                    else:
                        empty_events += 1

                    if timeout:
                        for composite_shipper in composite_shipper_cache.values():
                            composite_shipper.flush()

                        handle_timeout(
                            remaining_sqs_records=lambda_event["Records"][current_sqs_record:],
                            timeout_last_ending_offset=last_ending_offset,
                            timeout_last_event_expanded_offset=last_event_expanded_offset,
                            timeout_sent_events=sent_events,
                            timeout_empty_events=empty_events,
                            timeout_skipped_events=skipped_events,
                            timeout_config_yaml=config_yaml,
                        )

                        return "continuing"

            elif event_input.type == "s3-sqs":
                sqs_record_body: dict[str, Any] = json_parser(sqs_record["body"])
                for es_event, last_ending_offset, last_event_expanded_offset, current_s3_record in _handle_s3_sqs_event(
                    sqs_record_body,
                    event_input.id,
                    event_input.expand_event_list_from_field,
                    event_input.root_fields_to_add_to_expanded_event,
                    event_input.json_content_type,
                    event_input.get_multiline_processor(),
                ):
                    timeout, sent_outcome = event_processing(
                        processing_composing_shipper=composite_shipper, processing_es_event=es_event
                    )

                    if sent_outcome == EVENT_IS_SENT:
                        sent_events += 1
                    elif sent_outcome == EVENT_IS_FILTERED:
                        skipped_events += 1
                    else:
                        empty_events += 1

                    if timeout:
                        for composite_shipper in composite_shipper_cache.values():
                            composite_shipper.flush()

                        handle_timeout(
                            remaining_sqs_records=lambda_event["Records"][current_sqs_record:],
                            timeout_last_ending_offset=last_ending_offset,
                            timeout_last_event_expanded_offset=last_event_expanded_offset,
                            timeout_sent_events=sent_events,
                            timeout_empty_events=empty_events,
                            timeout_skipped_events=skipped_events,
                            timeout_config_yaml=config_yaml,
                            timeout_current_s3_record=current_s3_record,
                        )

                        return "continuing"

        for composite_shipper in composite_shipper_cache.values():
            composite_shipper.flush()

        shared_logger.info(
            "lambda processed all the events",
            extra={
                "sent_events": sent_events,
                "empty_events": empty_events,
                "skipped_events": skipped_events,
                "error_events": error_events,
            },
        )

    return "completed"