in multipagepdfa2i/multipagepdfa2i_stack.py [0:0]
def create_events(self, services):
# kickoff_notification = aws_s3_notifications.LambdaDestination(services["lambda"]["kickoff"])
extensions = [
"pdf", "pDf", "pDF", "pdF", "PDF", "Pdf",
"png", "pNg", "pNG", "pnG", "PNG", "Png",
"jpg", "jPg", "jPG", "jpG", "JPG", "Jpg"
]
for extension in extensions:
services["main_s3_bucket"].add_event_notification(
aws_s3.EventType.OBJECT_CREATED,
aws_s3_notifications.SqsDestination(services["sf_sqs"]),
aws_s3.NotificationKeyFilter(prefix="uploads/", suffix=extension)
)
services["lambda"]["kickoff"].add_event_source(
aws_lambda_event_sources.SqsEventSource(
services["sf_sqs"],
batch_size=1
)
)
services["lambda"]["analyzepdf"].add_event_source(
aws_lambda_event_sources.SqsEventSource(
services["textract_sqs"],
batch_size=1
)
)
human_complete_target = aws_events_targets.LambdaFunction(services["lambda"]["humancomplete"])
human_review_event_pattern = aws_events.EventPattern(
source=["aws.sagemaker"],
detail_type=["SageMaker A2I HumanLoop Status Change"]
)
aws_events.Rule(self,
"multipadepdfa2i_HumanReviewComplete",
event_pattern=human_review_event_pattern,
targets=[human_complete_target]
)