packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-cdk/lib/constructs/usage-analysis.ts (254 lines of code) (raw):
import { Construct } from "constructs";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as athena from "aws-cdk-lib/aws-athena";
import { CfnOutput, RemovalPolicy, Stack } from "aws-cdk-lib";
import * as glue from "@aws-cdk/aws-glue-alpha";
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";
import * as python from "@aws-cdk/aws-lambda-python-alpha";
import * as path from "path";
import { Runtime } from "aws-cdk-lib/aws-lambda";
import { aws_glue } from "aws-cdk-lib";
import { Database } from "./database";
import * as iam from "aws-cdk-lib/aws-iam";
export interface UsageAnalysisProps {
sourceDatabase: Database;
accessLogBucket?: s3.Bucket;
}
export class UsageAnalysis extends Construct {
public readonly database: glue.IDatabase;
public readonly ddbExportTable: glue.ITable;
public readonly ddbBucket: s3.IBucket;
public readonly resultOutputBucket: s3.IBucket;
public readonly workgroupName: string;
public readonly workgroupArn: string;
constructor(scope: Construct, id: string, props: UsageAnalysisProps) {
super(scope, id);
const GLUE_DATABASE_NAME = `${Stack.of(
this
).stackName.toLowerCase()}_usage_analysis`;
const DDB_EXPORT_TABLE_NAME = "ddb_export";
// Bucket to export DynamoDB data
const ddbBucket = new s3.Bucket(this, "DdbBucket", {
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
enforceSSL: true,
removalPolicy: RemovalPolicy.DESTROY,
objectOwnership: s3.ObjectOwnership.OBJECT_WRITER,
autoDeleteObjects: true,
versioned: true,
serverAccessLogsBucket: props.accessLogBucket,
serverAccessLogsPrefix: "DdbBucket",
});
// Bucket for Athena query results
const queryResultBucket = new s3.Bucket(this, "QueryResultBucket", {
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
enforceSSL: true,
removalPolicy: RemovalPolicy.DESTROY,
objectOwnership: s3.ObjectOwnership.OBJECT_WRITER,
autoDeleteObjects: true,
serverAccessLogsBucket: props.accessLogBucket,
serverAccessLogsPrefix: "QueryResultBucket",
});
// Workgroup for Athena
const wg = new athena.CfnWorkGroup(this, "Wg", {
name: `${Stack.of(this).stackName.toLowerCase()}_wg`,
description: "Workgroup for Athena",
recursiveDeleteOption: true,
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${queryResultBucket.bucketName}`,
},
},
});
const database = new glue.Database(this, "Database", {
databaseName: GLUE_DATABASE_NAME,
});
const imageSchemaType = glue.Schema.struct([
{
name: "CreateTime",
type: glue.Schema.struct([{ name: "N", type: glue.Schema.STRING }]),
},
{
name: "LastMessageId",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "MessageMap",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "IsLargeMessage",
type: glue.Schema.struct([{ name: "BOOL", type: glue.Schema.BOOLEAN }]),
},
{
name: "PK",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "SK",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "Title",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "TotalPrice",
type: glue.Schema.struct([
{ name: "N", type: glue.Schema.decimal(20, 10) },
]),
},
{
name: "BotId",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "Description",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "Instruction",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "PublicBotId",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "IsPinned",
type: glue.Schema.struct([{ name: "BOOL", type: glue.Schema.BOOLEAN }]),
},
{
name: "Knowledge",
type: glue.Schema.struct([
{
name: "M",
type: glue.Schema.map(
glue.Schema.STRING,
glue.Schema.array(glue.Schema.STRING)
),
},
]),
},
{
name: "LastBotUsed",
type: glue.Schema.struct([{ name: "N", type: glue.Schema.STRING }]),
},
{
name: "LastExecId",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "SyncStatus",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "SyncStatusReason",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "PublishedApiStackName",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
{
name: "PublishedApiDatetime",
type: glue.Schema.struct([{ name: "S", type: glue.Schema.STRING }]),
},
]);
const ddbExportTable = new glue.S3Table(this, "DdbExportTable", {
database,
bucket: ddbBucket,
tableName: DDB_EXPORT_TABLE_NAME,
partitionKeys: [
{
name: "datehour",
type: glue.Schema.STRING,
},
],
columns: [
{
name: "Metadata",
type: glue.Schema.struct([
{
name: "WriteTimestampMicros",
type: glue.Schema.struct([
{ name: "N", type: glue.Schema.STRING },
]),
},
]),
},
{
name: "Keys",
type: glue.Schema.struct([
{
name: "PK",
type: glue.Schema.struct([
{ name: "S", type: glue.Schema.STRING },
]),
},
{
name: "SK",
type: glue.Schema.struct([
{ name: "S", type: glue.Schema.STRING },
]),
},
]),
},
{
name: "OldImage",
type: imageSchemaType,
},
{
name: "NewImage",
type: imageSchemaType,
},
],
dataFormat: glue.DataFormat.JSON,
});
// Add partition projection using escape hatch
// Ref: https://docs.aws.amazon.com/cdk/v2/guide/cfn_layer.html
const cfnDdbExportTable = ddbExportTable.node
.defaultChild as aws_glue.CfnTable;
cfnDdbExportTable.addPropertyOverride("TableInput.Parameters", {
has_encrypted_data: false,
"projection.enabled": true,
"projection.datehour.type": "date",
// NOTE: To account for timezones that are ahead of UTC, specify a far future date instead of `NOW` for the end of the range.
"projection.datehour.range": "2023/01/01/00,2123/01/01/00",
"projection.datehour.format": "yyyy/MM/dd/HH",
"projection.datehour.interval": 1,
"projection.datehour.interval.unit": "HOURS",
"storage.location.template":
`s3://${ddbBucket.bucketName}/` + "${datehour}/AWSDynamoDB/data/",
});
const exportHandler = new python.PythonFunction(this, "ExportHandler", {
entry: path.join(__dirname, "../../../backend/s3_exporter/"),
runtime: Runtime.PYTHON_3_11,
environment: {
BUCKET_NAME: ddbBucket.bucketName,
TABLE_ARN: props.sourceDatabase.table.tableArn,
},
});
exportHandler.role?.addToPrincipalPolicy(
new iam.PolicyStatement({
actions: ["dynamodb:ExportTableToPointInTime"],
resources: [props.sourceDatabase.table.tableArn],
})
);
ddbBucket.grantReadWrite(exportHandler);
new events.Rule(this, "ScheduleRule", {
schedule: events.Schedule.cron({ minute: "5" }),
targets: [new targets.LambdaFunction(exportHandler)],
});
new CfnOutput(this, "UsageAnalysisWorkgroup", {
value: wg.name,
});
new CfnOutput(this, "UsageAnalysisOutputLocation", {
value: `s3://${queryResultBucket.bucketName}`,
});
this.database = database;
this.ddbBucket = ddbBucket;
this.ddbExportTable = ddbExportTable;
this.workgroupName = wg.name;
this.resultOutputBucket = queryResultBucket;
this.workgroupArn = `arn:aws:athena:*:${Stack.of(this).account}:workgroup/${
wg.name
}`;
}
}