def create_events()

in multipagepdfa2i/multipagepdfa2i_stack.py [0:0]


    def create_events(self, services):
        # kickoff_notification = aws_s3_notifications.LambdaDestination(services["lambda"]["kickoff"])
        extensions = [
            "pdf", "pDf", "pDF", "pdF", "PDF", "Pdf",
            "png", "pNg", "pNG", "pnG", "PNG", "Png",
            "jpg", "jPg", "jPG", "jpG", "JPG", "Jpg"
        ]
        for extension in extensions:
            services["main_s3_bucket"].add_event_notification(
                aws_s3.EventType.OBJECT_CREATED,  
                aws_s3_notifications.SqsDestination(services["sf_sqs"]),
                aws_s3.NotificationKeyFilter(prefix="uploads/", suffix=extension)
            )    
        
        services["lambda"]["kickoff"].add_event_source(
            aws_lambda_event_sources.SqsEventSource(
                services["sf_sqs"], 
                batch_size=1
            )
        )
        
        services["lambda"]["analyzepdf"].add_event_source(
            aws_lambda_event_sources.SqsEventSource(
                services["textract_sqs"], 
                batch_size=1
            )
        )

        human_complete_target = aws_events_targets.LambdaFunction(services["lambda"]["humancomplete"])

        human_review_event_pattern = aws_events.EventPattern(
            source=["aws.sagemaker"],
            detail_type=["SageMaker A2I HumanLoop Status Change"]
        )

        aws_events.Rule(self, 
            "multipadepdfa2i_HumanReviewComplete", 
            event_pattern=human_review_event_pattern,
            targets=[human_complete_target]
        )