in source/lib/ingestion/youtube-comments-stacks.ts [29:181]
constructor(scope: cdk.Construct, id: string, props?: cdk.NestedStackProps) {
super(scope, id, props);
const _youtubeAPIKey = new cdk.CfnParameter(this, 'YoutubeAPIKey', {
type: 'String',
description: 'Required: The SSM parameter key name where Youtube API credentails are stored',
allowedPattern: '^(?!\\s*$).+',
default: '/discovering-hot-topics-using-machine-learning/youtube/comments',
constraintDescription: 'Please provide the SSM Key for Youtube API'
});
const _eventBusArn = new cdk.CfnParameter(this, 'EventBus', {
type: 'String',
description: 'Required: The ARN of the Event Bus messaging backbone',
allowedPattern: '^arn:\\S+:events:\\S+:\\d{12}:event-bus/\\S+$',
constraintDescription: 'Please provide the ARN of the Event Bus messaging backbone'
});
const _streamArn = new cdk.CfnParameter(this, 'StreamARN', {
type: 'String',
description: 'Required: The name of the stream where search comments should be published for analysis',
allowedPattern: '^arn:\\S+:kinesis:\\S+:\\d{12}:stream/\\S+$',
constraintDescription: 'Please provide the Kinesis Stream name'
});
const _youtubeSearchQuery = new cdk.CfnParameter(this, 'YoutubeSearchQuery', {
type: 'String',
description: 'Optional search parameter to specify keywords to search for on Youtube. You can use NOT (-) and OR (|) operators to find videos. '+
'Example \'boating|sailing -fishing\'. For details refer API documentation on this link https://developers.google.com/youtube/v3/docs/search/list. At least one parameter from "YouTubeChannel" and "YoutubeSearchQuery" has to be provided.',
constraintDescription: 'Please provide key words for Youtube search query',
minLength: 0,
maxLength: 500
});
const _youtubeChannel = new cdk.CfnParameter(this, 'YouTubeChannel', {
type: 'String',
description: 'Optional parameter to retrieve comments data from videos from a specific channel. At least one parameter from "YouTubeChannel" and "YoutubeSearchQuery" has to be provided.',
allowedPattern: '^$|^(?!\\s*$).+',
constraintDescription: 'Please provide a valid YouTube Channel ID'
});
const _youtubeVideoSearchFreq = new cdk.CfnParameter(this, 'YouTubeSearchIngestionFreq', {
type: 'String',
default: 'cron(0 12 * * ? *)',
allowedPattern: `^$|${DiscoveringHotTopicsStack.cronRegex}`,
description: 'Required: The frequency at which at which YouTube comments should be retrieved',
constraintDescription: 'Please provide a valid cron expression of the formation \'cron(0 12 * * ? *)\'. For details on CloudWatch cron expressions, please refer the following link https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html'
});
const _videoSearchIngestionWindow = new cdk.CfnParameter(this, 'YouTubeSearchWindow', {
type: 'Number',
default: 7,
minValue: 1,
maxValue: 30,
description: 'Required: Please provide the window in "Days" (between 1 to 30) for videos to look for. This is used to filter out videos older than the specified window',
constraintDescription: 'The search window range can only be from 1 to 30 days (both inclusive). Please provide a valid range.'
});
new cdk.CfnRule(this, '"YouTubeSearchQueryIsNull"', {
ruleCondition: cdk.Fn.conditionEquals(cdk.Fn.ref(_youtubeSearchQuery.logicalId), ""),
assertions: [{
assert: cdk.Fn.conditionNot(cdk.Fn.conditionEquals(cdk.Fn.ref(_youtubeChannel.logicalId), "")),
assertDescription: 'If "DeployYouTubeCommentsIngestion" is set to "Yes" then atleast one parameter from "YouTubeVideoSearchQuery" and "YouTubeChannel" should be provided. Both cannot be blank'
}]
});
new cdk.CfnRule(this, '"YouTubeChannelIsNull"', {
ruleCondition: cdk.Fn.conditionEquals(cdk.Fn.ref(_youtubeChannel.logicalId), ""),
assertions: [{
assert: cdk.Fn.conditionNot(cdk.Fn.conditionEquals(cdk.Fn.ref(_youtubeSearchQuery.logicalId), "")),
assertDescription: 'If "DeployYouTubeCommentsIngestion" is set to "Yes" then atleast one parameter from "YouTubeVideoSearchQuery" and "YouTubeChannel" should be provided. Both cannot be blank'
}]
});
const _stream = kinesis.Stream.fromStreamArn(this, 'PublishCommentsStream', _streamArn.valueAsString);
const _eventBus = events.EventBus.fromEventBusArn(this, 'Bus', _eventBusArn.valueAsString) as events.EventBus;
const _youTubeDataIngestion = new DataIngestionTemplate(this, 'SearchVideo', {
source: {
lambdaFunctionProps: {
description: 'This lambda function searches for the videos',
runtime: lambda.Runtime.PYTHON_3_8,
handler: 'lambda_function.search_videos',
code: lambda.Code.fromAsset('lambda/ingestion-youtube'),
environment: {
VIDEO_NAMESPACE: this.video_namespace,
SSM_API_KEY: _youtubeAPIKey.valueAsString,
QUERY: _youtubeSearchQuery.valueAsString,
CHANNEL_ID: _youtubeChannel.valueAsString,
VIDEO_SEARCH_INGESTION_WINDOW: cdk.Token.asString(_videoSearchIngestionWindow.valueAsNumber),
},
timeout: cdk.Duration.minutes(15),
memorySize: 256
}
},
target: {
lambdaFunctionProps: {
description: 'This lambda function searches for the comments associates with the videos',
runtime: lambda.Runtime.PYTHON_3_8,
handler: 'lambda_function.search_comments',
code: lambda.Code.fromAsset('lambda/ingestion-youtube'),
environment: {
STREAM_NAME: _stream.streamName,
SSM_API_KEY: _youtubeAPIKey.valueAsString,
VIDEO_NAMESPACE: this.video_namespace,
CHANNEL_ID: _youtubeChannel.valueAsString,
VIDEO_SEARCH_INGESTION_WINDOW: cdk.Token.asString(_videoSearchIngestionWindow.valueAsNumber)
},
timeout: cdk.Duration.minutes(10),
memorySize: 256
},
tableProps: {
partitionKey: {
name: 'VIDEO_ID',
type: ddb.AttributeType.STRING
},
timeToLiveAttribute: "EXP_DATE"
}
},
ingestionEventRuleProps: {
eventPattern: {
account: [ cdk.Aws.ACCOUNT_ID ],
region: [ cdk.Aws.REGION ],
source: [ this.video_namespace ]
}
},
existingIngestionEventBus: _eventBus
});
_youTubeDataIngestion.sourceLambda.role?.attachInlinePolicy(new iam.Policy(this, 'SourceFnSSMPolicy', { // NOSONAR - typescript:S905. Sonarqube misinterprets the "?" operator
statements: [ new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [ 'ssm:GetParameter' ],
resources: [ `arn:${cdk.Aws.PARTITION}:ssm:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:parameter${_youtubeAPIKey.valueAsString}` ]
})]
}));
_youTubeDataIngestion.targetLambda.role?.attachInlinePolicy(new iam.Policy(this, 'TargetFnSSMPolicy', { // NOSONAR - typescript:S905. Sonarqube misinterprets the "?" operator
statements: [ new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [ 'ssm:GetParameter' ],
resources: [ `arn:${cdk.Aws.PARTITION}:ssm:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:parameter${_youtubeAPIKey.valueAsString}` ]
})]
}));
_stream.grantWrite(_youTubeDataIngestion.targetLambda);
const rule = new events.Rule(this, 'PollFrequency', {
schedule: events.Schedule.expression(_youtubeVideoSearchFreq.valueAsString)
});
rule.addTarget(new targets.LambdaFunction(_youTubeDataIngestion.sourceLambda));
}