in source/lib/ingestion/newscatcher-stack.ts [27:130]
constructor(scope: cdk.Construct, id: string, props?: cdk.NestedStackProps) {
super(scope, id, props);
const _eventBusArn = new cdk.CfnParameter(this, 'EventBus', {
type: 'String',
description: 'The ARN of the Event Bus messaging backbone',
allowedPattern: '^arn:\\S+:events:\\S+:\\d{12}:event-bus/\\S+$',
constraintDescription: 'Please provide the ARN of the Event Bus messaging backbone'
});
const _streamArn = new cdk.CfnParameter(this, 'StreamARN', {
type: 'String',
description: 'The name of the stream where the RSS feeds should be published for analysis',
allowedPattern: '^arn:\\S+:kinesis:\\S+:\\d{12}:stream/\\S+$',
constraintDescription: 'Please provide the Kinesis Stream name'
});
const _newsSearchQuery = new cdk.CfnParameter(this, 'NewsSearchQuery', {
type: 'String',
description: 'Provide any keyword (optional) to filter news feeds. Only feeds containing the keyword will be processed.'+
' If no keyword is provided, feeds will not be filtered',
constraintDescription: 'Please enter the keyword to use to filter RSS news feed'
});
const _newsFeedConfigParam = new cdk.CfnParameter(this, 'Config', {
type: 'String',
description: 'Provide configuration for RSS feeds. This parameter should be configured as a JSON string. Here is a sample configuration '+
'{"country":"ALL", "language":"ALL", "topic":"ALL"}. For Country and language use ISO code. The list of superset of all supported topics '+
'is: "tech", "news", "business", "science", "finance", "food", "politics", "economics", "travel", "entertainment", "music", "sport", "world".' +
'Note: not all topics are supported for each RSS provider. Setting the value as "ALL", is treated as a wild character search',
default: '{"country":"ALL", "language":"ALL", "topic":"ALL"}',
});
const ingestFrequency = new cdk.CfnParameter(this, 'IngestFrequency', {
type: 'String',
default: 'cron(23 0 * * ? *)', // default once a day at GMT 23:00 hours
description: 'The frequency at which RSS Feeds should be pulled. For detailed documentation on schedule expression rules, please refer https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html',
allowedPattern: DiscoveringHotTopicsStack.cronRegex,
constraintDescription: 'Please provide a valid cron expression of the format \'cron(0/5 * * * ? *)\'. For details on CloudWatch cron expressions, please refer the following link https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html'
});
const _stream = kinesis.Stream.fromStreamArn(this, 'PublishCommentsStream', _streamArn.valueAsString);
const _eventBus = events.EventBus.fromEventBusArn(this, 'Bus', _eventBusArn.valueAsString) as events.EventBus;
const _customDataIngestion = new DataIngestionTemplate(this, 'RSSNewsFeedIngestion', {
source: {
lambdaFunctionProps: {
description: 'This lambda function picks up the configuration for RSS news feeds',
runtime: lambda.Runtime.PYTHON_3_8,
handler: 'lambda_function.publish_config_handler',
code: lambda.Code.fromAsset('lambda/capture_news_feed'),
environment: {
INGESTION_NAMESPACE: this._newsConfigNamespace,
CONFIG_PARAM: _newsFeedConfigParam.valueAsString,
SEARCH_QUERY: _newsSearchQuery.valueAsString,
},
timeout: cdk.Duration.minutes(15),
memorySize: 256
}
},
target: {
lambdaFunctionProps: {
description: 'This lambda function pulls news feeds based on the above configuration received in the event from Amazon Event Bridge',
runtime: lambda.Runtime.PYTHON_3_8,
handler: 'lambda_function.process_config_handler',
code: lambda.Code.fromAsset('lambda/capture_news_feed'),
environment: {
INGESTION_NAMESPACE: this._newsConfigNamespace,
STREAM_NAME: _stream.streamName,
},
reservedConcurrentExecutions: 1, // this is to throttle consumption of config events and also putting onto the data stream
timeout: cdk.Duration.minutes(15),
memorySize: 256
},
tableProps: {
partitionKey: {
name: 'ID',
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: 'LAST_PUBLISHED_TIMESTAMP',
type: dynamodb.AttributeType.STRING
},
timeToLiveAttribute: 'EXP_DATE'
}
},
ingestionEventRuleProps: {
eventPattern: {
account: [ cdk.Aws.ACCOUNT_ID ],
region: [ cdk.Aws.REGION ],
source: [ this._newsConfigNamespace ]
}
},
existingIngestionEventBus: _eventBus
});
_stream.grantWrite(_customDataIngestion.targetLambda);
const rule = new events.Rule(this, 'PollFrequency', {
schedule: events.Schedule.expression(ingestFrequency.valueAsString)
});
rule.addTarget(new LambdaFunction(_customDataIngestion.sourceLambda));
}