constructor()

in src/lib/etl-glue.ts [32:262]


  constructor(scope: Construct, id: string, props: ETLProps) {
    super(scope, id);

    const glueJobBucket = new Bucket(this, 'GlueJobBucket', {
      encryption: BucketEncryption.S3_MANAGED,
      autoDeleteObjects: true,
      removalPolicy: RemovalPolicy.DESTROY,
      serverAccessLogsBucket: props.accessLogBucket,
      serverAccessLogsPrefix: 'glueJobBucketAccessLog',
    });

    const transactionDatabase = new Database(this, 'FraudDetectionDatabase', {
      databaseName: 'frand_detection_db',
    });

    const transactionTable = new Table(this, 'TransactionTable', {
      database: transactionDatabase,
      tableName: 'transaction',
      description: 'Transaction Table',
      columns: [
        { name: 'transactionid', type: Schema.STRING },
      ],
      dataFormat: DataFormat.PARQUET,
      bucket: props.bucket,
      s3Prefix: props.transactionPrefix,
      storedAsSubDirectories: true,
    });

    const identityTable = new Table(this, 'IdentityTable', {
      database: transactionDatabase,
      tableName: 'identity',
      description: 'Identity Table',
      columns: [
        { name: 'transactionid', type: Schema.STRING },
      ],
      dataFormat: DataFormat.PARQUET,
      bucket: props.bucket,
      s3Prefix: props.identityPrefix,
      storedAsSubDirectories: true,
    });

    // create crawler to update tables
    const crawlerRole = new Role(this, 'DataCrawlerRole', {
      assumedBy: new CompositePrincipal(
        new ServicePrincipal('glue.amazonaws.com')),
      managedPolicies: [ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')],
    });
    props.bucket.grantRead(crawlerRole, `${props.s3Prefix ?? '/'}*`);
    const crawler = new CfnCrawler(this, 'DataCrawler', {
      role: crawlerRole.roleArn,
      targets: {
        catalogTargets: [{
          databaseName: transactionDatabase.databaseName,
          tables: [
            transactionTable.tableName,
            identityTable.tableName,
          ],
        }],
      },
      databaseName: transactionDatabase.databaseName,
      description: 'The crawler updates tables in Data Catalog.',
      schemaChangePolicy: {
        updateBehavior: 'UPDATE_IN_DATABASE',
        deleteBehavior: 'LOG',
      },
    });
    this.crawlerName = crawler.ref;

    const glueJobSG = new SecurityGroup(this, 'GlueJobSG', {
      vpc: props.vpc,
      allowAllOutbound: true,
    });
    glueJobSG.addIngressRule(glueJobSG, Port.allTcp(), 'allow all TCP from same SG');
    (glueJobSG.node.defaultChild as CfnResource).addMetadata('cfn_nag', {
      rules_to_suppress: [
        {
          id: 'W40',
          reason: 'etl job need internet access to install pip packages',
        },
        {
          id: 'W5',
          reason: 'etl job need internet access to install pip packages',
        },
        {
          id: 'W27',
          reason: 'SG of glue job need open ingress required by Glue',
        },
      ],
    });

    var connCount = 1;
    const networkConntions = props.vpc.privateSubnets.map(sub => new CfnConnection(this, `NetworkConnection-${connCount++}`, {
      catalogId: transactionDatabase.catalogId,
      connectionInput: {
        connectionType: 'NETWORK',
        connectionProperties: {},
        physicalConnectionRequirements: {
          availabilityZone: sub.availabilityZone,
          subnetId: sub.subnetId,
          securityGroupIdList: [
            glueJobSG.securityGroupId,
          ],
        },
      },
    }));

    const securityConfName = `SecConf-${Stack.of(this).stackName}`;
    const securityConf = new SecurityConfiguration(this, 'FraudDetectionSecConf', {
      securityConfigurationName: securityConfName,
      s3Encryption: {
        mode: S3EncryptionMode.S3_MANAGED,
      },
      cloudWatchEncryption: {
        mode: CloudWatchEncryptionMode.KMS,
        kmsKey: props.key,
      },
      jobBookmarksEncryption: {
        mode: JobBookmarksEncryptionMode.CLIENT_SIDE_KMS,
        kmsKey: props.key,
      },
    });
    securityConf.cloudWatchEncryptionKey?.addToResourcePolicy(new PolicyStatement({
      principals: [new ServicePrincipal('logs.amazonaws.com')],
      actions: [
        'kms:Encrypt*',
        'kms:Decrypt*',
        'kms:ReEncrypt*',
        'kms:GenerateDataKey*',
        'kms:Describe*',
      ],
      resources: ['*'],
      conditions: {
        ArnLike: {
          'kms:EncryptionContext:aws:logs:arn': Stack.of(this).formatArn({
            service: 'logs',
            resource: 'log-group',
            resourceName: `/aws-glue/jobs/${securityConfName}*`,
            arnFormat: ArnFormat.COLON_RESOURCE_NAME,
          }),
        },
      },
    }));

    const glueJobRole = new Role(this, 'GlueJobRole', {
      assumedBy: new CompositePrincipal(
        new ServicePrincipal('glue.amazonaws.com')),
      managedPolicies: [ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')],
      inlinePolicies: {
        glue: new PolicyDocument({
          statements: [
            new PolicyStatement({
              actions: ['glue:GetConnection'],
              resources: [
                transactionDatabase.catalogArn,
                ...networkConntions.map(conn => Stack.of(this).formatArn({
                  service: 'glue',
                  resource: 'connection',
                  resourceName: conn.ref,
                })),
              ],
            }),
          ],
        }),
        logs: new PolicyDocument({
          statements: [
            new PolicyStatement({
              actions: ['logs:AssociateKmsKey'],
              resources: [
                Stack.of(this).formatArn({
                  service: 'logs',
                  resource: 'log-group',
                  resourceName: `/aws-glue/jobs/${securityConfName}*`,
                  arnFormat: ArnFormat.COLON_RESOURCE_NAME,
                }),
              ],
            }),
          ],
        }),

      },
    });
    identityTable.grantRead(glueJobRole);
    transactionTable.grantRead(glueJobRole);

    glueJobBucket.grantReadWrite(glueJobRole, 'tmp/*');
    const scriptPrefix = this._deployGlueArtifact(glueJobBucket,
      path.join(__dirname, '../scripts/glue-etl.py'), 'src/scripts/');
    glueJobBucket.grantRead(glueJobRole, `${scriptPrefix}/*`);

    const neptuneGlueConnectorLibName = 'neptune_python_utils.zip';
    const libPrefix = this._deployGlueArtifact(glueJobBucket,
      path.join(__dirname, `../script-libs/amazon-neptune-tools/neptune-python-utils/target/${neptuneGlueConnectorLibName}`),
      'src/script-libs/amazon-neptune-tools/neptune-python-utils/target/');
    glueJobBucket.grantRead(glueJobRole, `${libPrefix}/*`);

    const outputPrefix = `${props.s3Prefix ?? ''}processed-data/`;
    const etlJob = new CfnJob(this, 'PreprocessingJob', {
      command: {
        name: 'glueetl',
        pythonVersion: '3',
        scriptLocation: glueJobBucket.s3UrlForObject(`${scriptPrefix}/glue-etl.py`),
      },
      defaultArguments: {
        '--region': Aws.REGION,
        '--database': transactionDatabase.databaseName,
        '--transaction_table': transactionTable.tableName,
        '--identity_table': identityTable.tableName,
        '--id_cols': props.dataColumnsArg.id_cols,
        '--cat_cols': props.dataColumnsArg.cat_cols,
        '--output_prefix': props.bucket.s3UrlForObject(outputPrefix),
        '--job-language': 'python',
        '--job-bookmark-option': 'job-bookmark-disable',
        '--TempDir': glueJobBucket.s3UrlForObject('tmp/'),
        '--enable-continuous-cloudwatch-log': 'true',
        '--enable-continuous-log-filter': 'false',
        '--enable-metrics': '',
        '--extra-py-files': [glueJobBucket.s3UrlForObject(`${libPrefix}/${neptuneGlueConnectorLibName}`)].join(','),
        '--additional-python-modules': 'koalas==1.8.1',
      },
      role: glueJobRole.roleArn,
      maxCapacity: 8,
      glueVersion: '2.0',
      connections: {
        connections: networkConntions.map(conn => conn.ref),
      },
      securityConfiguration: securityConf.securityConfigurationName,
    });
    props.bucket.grantWrite(glueJobRole, `${outputPrefix}*`);
    this.jobName = etlJob.ref;
    this.processedOutputPrefix = outputPrefix;
  }