generators/s3-generator.ts (74 lines of code) (raw):

/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import AWS from 'aws-sdk'; import { IBatchGenerator, ILogData } from "../core/ext-types.js" import fs from 'fs'; import readline from 'readline'; const DATA_PATH = "" /* * S3 Line by Line Generator * This generator reads a file as input and * sequentially outputs file, line by line */ interface IGeneratorConfig { bucket: string, file: string, batchSize: number, loop: boolean, isJSON: boolean, logKey: string, } const defaultConfig: IGeneratorConfig = { bucket: "myS3Bucket", file: "/my/file.log", batchSize: 1, loop: true, isJSON: false, logKey: "log", }; const s3Generator: IBatchGenerator = { name: "s3", defaultConfig: defaultConfig, createConfiguredGenerator: function (config: IGeneratorConfig) { const s3 = new AWS.S3(); const bucket = config.bucket const file = config.file; let s3Loaded = false; let s3Logs: Array<string> = []; return { generatorTemplate: this, makeInstance: (() => (async function*() { if (s3Loaded == false) { try { const s3Response = await s3.getObject({ Bucket: bucket, Key: file }).promise(); s3Logs = (s3Response?.Body?.toString() ?? "").split("\n"); } catch (e) { console.log("S3 Generator failed to load configuration files."); throw(e); } s3Loaded = true; } let hasLooped = false; while (config.loop || !hasLooped) { let batch: Array<ILogData> = []; for await (const line of s3Logs) { if (config.isJSON) { batch.push(JSON.parse(line)) } // we need to convert text to json else { batch.push({ [config.logKey]: line } as ILogData); } if (batch.length === config.batchSize) { yield batch; batch = []; } // Each line in input.txt will be successively available here as `line`. // console.log(`Line from file: ${line}`); } if (batch.length !== 0) { yield batch; } hasLooped = true; } })()), } } }; export default s3Generator;