in next_steps/kinesis_stream_connector/l4m_connector/stack/l4m_connector_stack.ts [26:216]
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
/*
Source data schema: The default schema provided below matches the schema generated by
sample data generator provided with this repo. Please update "fieldSchema" to match your
source kinesis stream records
*/
const fieldSchema: CfnTable.ColumnProperty[] = [
{
name: 'platform',
type: 'string',
comment: 'platform source',
},
{
name: 'marketplace',
type: 'string',
comment: 'country code',
},
{
name: 'event_time',
type: 'timestamp',
comment: 'event timestamp',
},
{
name: 'views',
type: 'bigint',
comment: 'total views for the time interval',
},
{
name: 'revenue',
type: 'decimal',
comment: 'total revenue for the time interval ',
},
];
// Get the stream name from cdk.json - ensure the stream exists
// - either from the one you deployed using data generator cdk in this repo or
// your own kinesis stream source
const streamName = this.node.tryGetContext('streamName');
const streamArn = this.formatArn({
resourceName: streamName,
service: 'kinesis',
resource: 'stream'
});
// Glue and S3 parameters passed from cdk.json
const glueDBName = this.node.tryGetContext('glueDBName');
const glueTableName = this.node.tryGetContext('glueTableName');
const sourceFormat = this.node.tryGetContext('sourceFormat');
const bucketName = this.node.tryGetContext('bucketName');
const bucketPrefix = this.node.tryGetContext('bucketPrefix');
const l4mInterval = this.node.tryGetContext('l4mInterval')
// Obtain a Stream object from Stream name/ARN
const sourceStream = Stream.fromStreamArn(this, "sourceStream", streamArn);
// Create S3 bucket to be used as destination (sink) for AWS Glue Spark Streaming ETL job
const l4mBucket = new Bucket(this, bucketName, {
bucketName: bucketName,
publicReadAccess: false,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
enforceSSL: true
});
/*
Location of the AWS Glue Spark Streaming ETL script
- the main script that ingests data from Kinesis Data Stream and writes them to S3
organized by time interval, ready for use by Amazon Lookout for Metrics service
*/
const glueSparkScript = new Asset(this, 'gluespartkscript', {
path: "./src/glue_spark_etl_to_s3.py"
});
// Create Glue Database
const glueDB = new CfnDatabase(this, glueDBName, {
catalogId: cdk.Aws.ACCOUNT_ID,
databaseInput: {
name: glueDBName,
description: 'Glue DB'
}
});
// Setup the schema (to be used by Glue Table)
const paths: string = fieldSchema.map((item) => {
return item.name;
}).join(',');
/*
Setup the Glue Table properties
Glue Table is connected to the source Kinesis Data Stream
*/
const tableProps = {
catalogId: glueDB.catalogId,
databaseName: glueDBName,
tableInput: {
name: glueTableName,
storageDescriptor: {
columns: fieldSchema,
location: streamName,
inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
outputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
compressed: false,
numberOfBuckets: -1,
serdeInfo: {
serializationLibrary: "org.openx.data.jsonserde.JsonSerDe",
parameters: {
paths: paths
}
},
parameters: {
endpointUrl: `https://kinesis.${cdk.Aws.REGION}.amazonaws.com`,
streamName: streamName,
typeOfData: 'kinesis'
}
},
tableType: 'EXTERNAL_TABLE',
parameters: {
classification: sourceFormat
}
}
};
const glueTable = new CfnTable(this, glueTableName, tableProps);
glueTable.addDependsOn(glueDB);
// Setup IAM policies for AWS Glue Job
const gluePolicyStmts = new PolicyStatement();
gluePolicyStmts.addActions("s3:*");
gluePolicyStmts.addResources(l4mBucket.bucketArn);
gluePolicyStmts.addActions("s3:*");
gluePolicyStmts.addResources(`${l4mBucket.bucketArn}/*`);
gluePolicyStmts.addActions("kinesis:DescribeStream");
gluePolicyStmts.addResources(sourceStream.streamArn);
const gluePolicy = new Policy(this, "gluePolicy");
gluePolicy.addStatements(gluePolicyStmts);
const glueRole = new Role(this, "glueRole", {
assumedBy: new ServicePrincipal("glue.amazonaws.com"),
managedPolicies: [
ManagedPolicy.fromAwsManagedPolicyName("AmazonS3ReadOnlyAccess"),
ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
ManagedPolicy.fromAwsManagedPolicyName("AWSGlueSchemaRegistryFullAccess")
]
});
gluePolicy.attachToRole(glueRole);
sourceStream.grantRead(glueRole);
// Setup AWS Glue Job to execute Spark streaming ETL script
const glueSparkJob = new CfnJob(this, 'glueSparkETLtoS3', {
name: "glueSparkETLtoS3",
description: "Glue Spark streaming ETL job to ingest kinesis data stream to S3",
role: glueRole.roleArn,
glueVersion: "2.0",
command:{
name: "gluestreaming",
scriptLocation: glueSparkScript.s3ObjectUrl,
pythonVersion: "3"
},
defaultArguments: {
"--srcDBName": glueDBName,
"--srcTableName": glueTableName,
"--srcFormat": sourceFormat,
"--l4mBucket": bucketName,
"--l4mBucketPrefix": bucketPrefix,
"--l4mInterval": l4mInterval,
"--job-bookmark-option": "job-bookmark-disable"
},
allocatedCapacity: 1,
maxRetries: 1,
executionProperty: {maxConcurrentRuns: 1}
});
glueSparkJob.addDependsOn(glueTable);
new CfnOutput(this, "Glue ETL Job", {
description: "Glue ETL Job",
exportName: "glue-etl-job",
value: String(glueSparkJob.name)
});
new CfnOutput(this, "Output S3 Bucket", {
description: "Output S3 Bucket i.e. input to lookout for metrics",
exportName: "s3bucket-live-data",
value: l4mBucket.bucketArn
});
}