in core/src/data-generator/batch-replayer.ts [62:182]
constructor(scope: Construct, id: string, props: BatchReplayerProps) {
super(scope, id);
this.dataset = props.dataset;
this.frequency = props.frequency || 60;
this.sinkBucket = props.sinkBucket;
this.outputFileMaxSizeInBytes = props.outputFileMaxSizeInBytes || 100 * 1024 * 1024; //Default to 100 MB
/**
* Find all paths within the time range from the manifest file
*/
const findFilePathsFn = new lambda.DockerImageFunction(this, "findFilePathFn", {
memorySize: 1024,
code: lambda.DockerImageCode.fromImageAsset(path.join(__dirname, '../../lib/data-generator/resources/lambdas/find-file-paths')),
logRetention: RetentionDays.ONE_DAY,
timeout: Duration.minutes(15),
});
// Grant access to read S3
const { bucketName, objectKey } = this.dataset.manifestLocation;
findFilePathsFn.role?.attachInlinePolicy(
new Policy(this, 'read-manifest-file-policy', {
statements: [
new PolicyStatement({
actions: ['s3:GetObject'],
resources: [
`arn:aws:s3:::${bucketName}/${objectKey}`
],
}),
],
}),
);
const findFilePathsFnTask = new LambdaInvoke(this, "findFilePathFnTask", {
lambdaFunction: findFilePathsFn,
payload: TaskInput.fromObject({
frequency: props.frequency,
manifestFileBucket: this.dataset.manifestLocation?.bucketName,
manifestFileKey: this.dataset.manifestLocation?.objectKey,
triggerTime: JsonPath.stringAt('$$.Execution.Input.time'),
offset: '' + this.dataset.offset,
}),
// Retry on 500 error on invocation with an interval of 2 sec with back-off rate 2, for 6 times
retryOnServiceExceptions: true,
outputPath: "$.Payload",
});
/**
* Write data in batch step
*/
const writeInBatchFn = new lambda.DockerImageFunction(this, "writeInBatchFn", {
memorySize: 1024 * 5,
code: lambda.DockerImageCode.fromImageAsset(path.join(__dirname, '../../lib/data-generator/resources/lambdas/write-in-batch')),
logRetention: RetentionDays.ONE_DAY,
timeout: Duration.minutes(15),
});
// Grant access to all s3 file in the dataset bucket
writeInBatchFn.role?.attachInlinePolicy(
new Policy(this, 'read-dataset-buckets-policy', {
statements: [
new PolicyStatement({
actions: ['s3:GetObject'],
resources: [
`arn:aws:s3:::${this.dataset.location.bucketName}*`
],
}),
],
}),
);
this.sinkBucket.grantPut(writeInBatchFn);
const writeInBatchFnTask = new LambdaInvoke(this, "writeInBatchFnTask", {
lambdaFunction: writeInBatchFn,
payload: TaskInput.fromObject({
// Array from the last step to be mapped
outputFileIndex: JsonPath.stringAt('$.index'),
filePath: JsonPath.stringAt('$.filePath'),
// For calculating the start/end time
frequency: props.frequency,
triggerTime: JsonPath.stringAt('$$.Execution.Input.time'),
offset: '' + this.dataset.offset,
// For file processing
dateTimeColumnToFilter: this.dataset.dateTimeColumnToFilter,
dateTimeColumnsToAdjust: this.dataset.dateTimeColumnsToAdjust,
sinkPath: this.sinkBucket.s3UrlForObject(`${this.dataset.tableName}`),
outputFileMaxSizeInBytes: 20480,
}),
// Retry on 500 error on invocation with an interval of 2 sec with back-off rate 2, for 6 times
retryOnServiceExceptions: true,
outputPath: "$.Payload",
});
/**
* Use "Map" step to write each filePath parallelly
*/
const writeInBatchMapTask = new Map(this, "writeInBatchMapTask", {
itemsPath: JsonPath.stringAt('$.filePaths'),
parameters: {
index: JsonPath.stringAt('$$.Map.Item.Index'),
filePath: JsonPath.stringAt('$$.Map.Item.Value'),
}
});
writeInBatchMapTask.iterator(writeInBatchFnTask);
/**
* Overarching Step Function StateMachine
*/
const batchReplayStepFn = new StateMachine(this, "batchReplayStepFn", {
definition: findFilePathsFnTask.next(writeInBatchMapTask),
timeout: Duration.minutes(20),
});
new Rule(this, 'batchReplayStepFnTrigger', {
schedule: Schedule.cron({ minute: `0/${Math.ceil(this.frequency/60)}` }),
targets: [new SfnStateMachine(batchReplayStepFn, {})],
});