constructor()

in infrastructure/lib/textract-pipeline-stack.ts [25:318]


  constructor(scope: cdk.Construct, id: string, props: MultistackProps) {
    super(scope, id, props);

    //**********SNS Topics******************************
    const textractJobCompletionTopic = new sns.Topic(this, 'JobCompletion');

    //**********IAM Roles******************************
    const textractServiceRole = new iam.Role(this, 'TextractServiceRole', {
      assumedBy: new iam.ServicePrincipal('textract.amazonaws.com')
    });
    textractServiceRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        resources: [textractJobCompletionTopic.topicArn],
        actions: ["sns:Publish"]
      })
    );


    //**********S3 Bucket******************************
    //S3 bucket for input documents and output
    const rawContentsBucket = new s3.Bucket(this, 'RawDocumentsBucket', { versioned: false, removalPolicy: cdk.RemovalPolicy.DESTROY});

    const asyncdocBucket = new s3.Bucket(this, 'LargeDocumentsBucket', { versioned: false, removalPolicy: cdk.RemovalPolicy.DESTROY});

    const syncdocBucket = new s3.Bucket(this, 'ImageDocumentsBucket', { versioned: false, removalPolicy: cdk.RemovalPolicy.DESTROY});

    const textractResultsBucket = new s3.Bucket(this, 'TextractResultsBucket', { versioned: false, removalPolicy: cdk.RemovalPolicy.DESTROY});

    //Comprehend Output Bucket
    const comprehendResultsBucket = new s3.Bucket(this, 'ComprehendResultsBucket', { versioned: false, removalPolicy: cdk.RemovalPolicy.DESTROY});

    //Queue
    const jobResultsQueue = new sqs.Queue(this, 'JobResults', {
      visibilityTimeout: cdk.Duration.seconds(900), retentionPeriod: cdk.Duration.seconds(1209600)
    });
    //Trigger

    textractJobCompletionTopic.addSubscription(
      new snsSubscriptions.SqsSubscription(jobResultsQueue)
    );
  
    const pipelineLayer = new lambda.LayerVersion(this, 'PipelineFunctionsLayer', {
      code: lambda.Code.fromAsset('code/lambda_layer/pipeline'),
      compatibleRuntimes: [lambda.Runtime.PYTHON_3_7],
      license: 'Apache-2.0',
      description: 'NLP Pipeline and Document Registration Layer',
    });

    // S3 Event processor
    const documentRegistrar = new lambda.Function(this, 'DocumentRegistrar', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/document_registrar'),
      handler: 'document_registrar.lambda_handler',
      timeout: cdk.Duration.seconds(30),
      environment: {
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    //Layer
    documentRegistrar.addLayers(pipelineLayer)
    //Trigger
    documentRegistrar.addEventSource(new S3EventSource(rawContentsBucket, {
      events: [
        s3.EventType.OBJECT_CREATED,
        s3.EventType.OBJECT_REMOVED_DELETE
      ]
    }));

    //Permissions
    rawContentsBucket.grantReadWrite(documentRegistrar)
    
    documentRegistrar.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );

    const documentClassifier = new lambda.Function(this, 'DocumentClassifier', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/document_classifier'),
      handler: 'document_classifier.lambda_handler',
      timeout: cdk.Duration.seconds(30),
      environment: {
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    
    documentClassifier.addLayers(pipelineLayer)
    //Trigger
    documentClassifier.addEventSource(new DynamoEventSource(props.documentRegistryTable, {
      startingPosition: lambda.StartingPosition.TRIM_HORIZON
    }));
    
    documentClassifier.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );

    //------------------------------------------------------------

    // Document processor (Router to Sync/Async Pipeline)
    const extensionDetector = new lambda.Function(this, 'ExtensionDetector', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/extension_detector'),
      handler: 'extension_detector.lambda_handler',
      timeout: cdk.Duration.seconds(900),
      environment: {
        TARGET_SYNC_BUCKET :  syncdocBucket.bucketName,
        TARGET_ASYNC_BUCKET : asyncdocBucket.bucketName,
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    //Layer
    extensionDetector.addLayers(pipelineLayer)
    //Trigger
    extensionDetector.addEventSource(new DynamoEventSource(props.pipelineOpsTable, {
      startingPosition: lambda.StartingPosition.TRIM_HORIZON
    }));

    //Permissions`
    rawContentsBucket.grantReadWrite(extensionDetector)
    asyncdocBucket.grantReadWrite(extensionDetector)
    syncdocBucket.grantReadWrite(extensionDetector)
    
    extensionDetector.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );

    //------------------------------------------------------------

    // Sync Jobs Processor (Process jobs using sync APIs)
    const textractSyncProcessor = new lambda.Function(this, 'TextractSyncProcessor', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/textract_sync'),
      handler: 'textract_processor.lambda_handler',
      timeout: cdk.Duration.seconds(30),
      environment: {
        PIPELINE_OPS_TABLE: props.pipelineOpsTable.tableName,
        TARGET_TEXTRACT_BUCKET_NAME: textractResultsBucket.bucketName,
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    //Layer
    textractSyncProcessor.addLayers(pipelineLayer)
    //Trigger
    textractSyncProcessor.addEventSource(new S3EventSource(syncdocBucket, {
      events: [ s3.EventType.OBJECT_CREATED ]
    }));
    //Permissions
    syncdocBucket.grantReadWrite(textractSyncProcessor)
    textractResultsBucket.grantReadWrite(textractSyncProcessor)
    textractSyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["textract:*"],
        resources: ["*"]
      })
    );
    textractSyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );
    //------------------------------------------------------------

    // Async Job Processor (Start jobs using Async APIs)
    const textractAsyncStarter = new lambda.Function(this, 'TextractAsyncStarter', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/textract_async'),
      handler: 'textract_starter.lambda_handler',
      timeout: cdk.Duration.seconds(60),
      environment: {
        TEXTRACT_SNS_TOPIC_ARN : textractJobCompletionTopic.topicArn,
        TEXTRACT_SNS_ROLE_ARN : textractServiceRole.roleArn,
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn,
        TEXTRACT_RESULTS_BUCKET: textractResultsBucket.bucketName
      }
    });

    //Layer
    textractAsyncStarter.addLayers(pipelineLayer)
    //Triggers
    textractAsyncStarter.addEventSource(new S3EventSource(asyncdocBucket, {
      events: [ s3.EventType.OBJECT_CREATED ]
    }));
    //Permissions
    asyncdocBucket.grantRead(textractAsyncStarter)
    textractResultsBucket.grantReadWrite(textractAsyncStarter)
    textractAsyncStarter.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["iam:PassRole"],
        resources: [textractServiceRole.roleArn]
      })
    );
    textractAsyncStarter.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["textract:*"],
        resources: ["*"]
      })
    );
    textractAsyncStarter.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );
    //------------------------------------------------------------

    // Async Jobs Results Processor
    const textractAsyncProcessor = new lambda.Function(this, 'TextractAsyncProcessor', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/textract_async'),
      handler: 'textract_processor.lambda_handler',
      memorySize: 10000,
      timeout: cdk.Duration.seconds(900),
      environment: {
        TARGET_TEXTRACT_BUCKET_NAME: textractResultsBucket.bucketName,
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    //Layer
    textractAsyncProcessor.addLayers(pipelineLayer)
    //Triggers
    textractAsyncProcessor.addEventSource(new SqsEventSource(jobResultsQueue, {
      batchSize: 1
    }));
    //Permissions
    asyncdocBucket.grantReadWrite(textractAsyncProcessor)
    textractResultsBucket.grantReadWrite(textractAsyncProcessor)
    jobResultsQueue.grantConsumeMessages(textractAsyncProcessor)
    textractAsyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["textract:*"],
        resources: ["*"]
      })
    );
    textractAsyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );

    //--------------

    // Comprehend Lambda
    const comprehendSyncProcessor = new lambda.Function(this, 'ComprehendSyncProcessor', {
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.asset('code/comprehend_sync'),
      handler: 'comprehend_processor.lambda_handler',
      memorySize: 10000,
      timeout: cdk.Duration.seconds(900),
      environment: {
        TARGET_ES_CLUSTER: props.esDomain.domainEndpoint,
        TARGET_COMPREHEND_BUCKET: comprehendResultsBucket.bucketName,
        METADATA_SNS_TOPIC_ARN : props.metadataTopic.topicArn
      }
    });
    //Layer
    comprehendSyncProcessor.addLayers(pipelineLayer)
    //Trigger
    comprehendSyncProcessor.addEventSource(new S3EventSource(textractResultsBucket, {
      events: [ s3.EventType.OBJECT_CREATED ],
      filters: [ { suffix: 'fullresponse.json' }]
    }));
    //Permissions
    textractResultsBucket.grantReadWrite(comprehendSyncProcessor)
    comprehendResultsBucket.grantReadWrite(comprehendSyncProcessor)
    comprehendSyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["es:*"],
        resources: ["*"]
      })
    );
    comprehendSyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["comprehend:*"],
        resources: ["*"]
      })
    );
    comprehendSyncProcessor.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ["sns:publish"],
        resources: ["*"]
      })
    );
  }