in lib/amazon-dynamodb-store-stream-sports-data-stack.ts [16:178]
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// The code that defines your stack goes here
const region = cdk.Stack.of(this).region
const tableName = "sportsfeeds"
const kinesisStream = "feedstream"
const feedStream = new kinesis.Stream(this, kinesisStream, {
streamName: kinesisStream,
shardCount: 3,
})
const table = new Table(this, tableName, {
billingMode: BillingMode.PAY_PER_REQUEST,
partitionKey: { name: "PK", type: AttributeType.STRING },
sortKey: { name: "SK", type: AttributeType.STRING },
removalPolicy: RemovalPolicy.DESTROY,
tableName: tableName,
kinesisStream: feedStream,
})
table.addGlobalSecondaryIndex({
indexName: "GSI-1",
partitionKey: { name: "ISSCHEDULED", type: dynamodb.AttributeType.STRING },
sortKey: {name: 'GAMEDATE', type: dynamodb.AttributeType.NUMBER},
projectionType: dynamodb.ProjectionType.ALL,
})
table.addGlobalSecondaryIndex({
indexName: "GSI-2",
partitionKey: { name: "CONNECTION", type: dynamodb.AttributeType.STRING },
projectionType: dynamodb.ProjectionType.ALL,
})
const writeFeeds = new lambda.Function(this, "writeFeeds", {
code: lambda.Code.fromAsset("lambda/writeFeeds"),
handler: "writeFeeds.handler",
functionName: "writeFeeds",
runtime: lambda.Runtime.NODEJS_14_X,
memorySize: 256,
environment : { "TABLE_NAME" : tableName, "REGION" : region }
})
table.grant(writeFeeds, "dynamodb:BatchWriteItem")
const readFeeds = new lambda.Function(this, "readFeeds", {
code: lambda.Code.fromAsset("lambda/readFeeds"),
handler: "readFeeds.handler",
functionName: "readFeeds",
runtime: lambda.Runtime.NODEJS_14_X,
memorySize: 256,
environment : { "TABLE_NAME" : tableName, "REGION" : region }
})
table.grant(readFeeds, "dynamodb:Query")
const connectionManager = new lambda.Function(this, "connectionManager", {
code: lambda.Code.fromAsset("lambda/connectionManager"),
handler: "connectionManager.handler",
functionName: "connectionManager",
runtime: lambda.Runtime.NODEJS_14_X,
memorySize: 256,
environment : { "TABLE_NAME" : tableName, "REGION" : region }
})
table.grant(
connectionManager,
"dynamodb:putitem",
"dynamodb:DeleteItem"
)
const streamConsumer = new lambda.Function(this, "streamConsumer", {
code: lambda.Code.fromAsset("lambda/streamConsumer"),
handler: "streamConsumer.handler",
functionName: "streamConsumer",
runtime: lambda.Runtime.NODEJS_14_X,
memorySize: 256,
environment : { "TABLE_NAME" : tableName, "REGION" : region }
})
table.grant(streamConsumer, "dynamodb:scan")
const consumer = new CfnStreamConsumer(this, 'stream-consumer', {
consumerName: 'feed-stream-consumer',
streamArn: feedStream.streamArn,
});
const eventSourceMapping = new EventSourceMapping(this, 'event-source-mapping', {
batchSize: 10,
eventSourceArn: consumer.attrConsumerArn,
startingPosition: StartingPosition.TRIM_HORIZON,
target: streamConsumer,
});
const api = new RestApi(this, `feedsAPI`, {
defaultCorsPreflightOptions: {
allowOrigins: Cors.ALL_ORIGINS,
},
restApiName: `feedsAPI`,
})
const ticksResource = api.root.addResource("feeds")
ticksResource.addMethod("GET", new LambdaIntegration(readFeeds))
ticksResource.addMethod("PUT", new LambdaIntegration(writeFeeds))
const webSocketApi = new WebSocketApi(this, "bookmakerAPI", {
connectRouteOptions: {
integration: new LambdaWebSocketIntegration({
handler: connectionManager,
}),
},
disconnectRouteOptions: {
integration: new LambdaWebSocketIntegration({
handler: connectionManager,
}),
},
defaultRouteOptions: {
integration: new LambdaWebSocketIntegration({
handler: connectionManager,
}),
},
})
const apiStage = new WebSocketStage(this, "ProdStage", {
webSocketApi,
stageName: "prod",
autoDeploy: true,
})
const connectionsArns = this.formatArn({
service: "execute-api",
resourceName: `${apiStage.stageName}/POST/*`,
resource: webSocketApi.apiId,
})
const kinesisStreamReadPolicyStmt = new PolicyStatement({
resources: [feedStream.streamArn],
actions: [
'kinesis:DescribeStreamSummary',
'kinesis:GetRecords',
'kinesis:GetShardIterator',
'kinesis:ListShards',
],
});
const kinesisConsumerPolicyStmt = new PolicyStatement({
resources: [consumer.attrConsumerArn],
actions: ['kinesis:SubscribeToShard'],
});
streamConsumer.addToRolePolicy(kinesisStreamReadPolicyStmt);
streamConsumer.addToRolePolicy(kinesisConsumerPolicyStmt);
streamConsumer.addToRolePolicy(
new PolicyStatement({
actions: ['execute-api:ManageConnections'],
resources: [connectionsArns],
})
)
}