generators/csv-generator.ts (106 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { IBatchGenerator, ILogData } from "../core/ext-types.js"
import fs from "fs"
import readline from 'readline';
const DATA_PATH = ""
/*
* CSV Line by Line Generator
* This generator reads a csv file as input and
* sequentially outputs file, line by line
*
* File should be formatted:
* @timestamp, @message
* anything , anything | object
*
* Only objects will be output by this generator,
* other text will be filtered out.
*/
interface IGeneratorConfig {
data: string,
batchSize: number,
skipHeader: boolean,
isJson: boolean,
loop: boolean,
logKey: string, /* Ignored if the isJson option is set */
}
const defaultConfig: IGeneratorConfig = {
data: "examples/csv-logs-as-json.csv",
batchSize: 10,
skipHeader: true,
isJson: false,
logKey: "log",
loop: false,
};
const csvGenerator: IBatchGenerator = {
name: "csv",
defaultConfig: defaultConfig,
createConfiguredGenerator: function (config: any) {
return {
generatorTemplate: this,
makeInstance: (() => (async function*() {
let ranOnce = false;
let batch: Array<ILogData> = [];
while (config.loop || !ranOnce) {
const fileStream = fs.createReadStream(DATA_PATH + config.data);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.
let skippedHeader = false;
let timestamped = true;
let parsed: any;
for await (const line of rl) {
if (!skippedHeader) {
skippedHeader = true;
if (line === "message") {
timestamped = false;
}
continue;
}
let cell1: string;
let cell2: string;
try {
if (timestamped) {
[cell1, cell2] = line.split(/,(.+)/);
}
// overwrite cell 2 with cell if there is no timestamp
else {
cell2 = line;
}
// overwrite cell 2 with cell if there is no timestamp
if (cell2 === undefined) {
cell2 = cell1;
}
// remove quotes if needed
if (cell2.charAt(0) === "\"") {
cell2 = cell2.slice(1);
}
if (cell2.charAt(cell2.length - 1) === "\"") {
cell2 = cell2.slice(0, -1);
}
// substitute "" with "
try {
cell2 = cell2.replace(/\"\"/g, "\"");
}
catch (e) {
console.log(e)
}
if (config.isJson) {
parsed = JSON.parse(cell2); // will throw error
if (typeof parsed !== "object") {
continue;
}
}
} catch (e) {
continue;
}
if (config.isJson) {
batch.push(parsed);
}
else {
batch.push({
[config.logKey]: cell2,
});
}
if (batch.length === config.batchSize) {
yield batch;
batch = [];
}
// Each line in input.txt will be successively available here as `line`.
// console.log(`Line from file: ${line}`);
}
rl.close();
fileStream.close();
ranOnce = true;
if (!config.loop) {
if (batch.length !== 0) {
yield batch;
}
}
}
})()),
}
}
};
export default csvGenerator;