datajets/forward-datajet.ts (110 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
// import logger from "fluent-logger";
import { IDatajet, ILogData } from "../core/ext-types.js";
import logger from 'fluent-logger';
interface IDatajetConfig {
tagPrefix: string,
host: string,
port: number,
timeout: 3.0,
reconnectInterval: number,
requireAckResponse: boolean, // Set to true to wait response from Fluentd certainly
inputStructure: "object" | "log-key-json" | "log-key-string",
timeOffset: number,
batchSend: boolean,
logKey: string,
addKeys: object,
}
const defaultConfig: IDatajetConfig = {
tagPrefix: 'tag_prefix',
host: '0.0.0.0',
port: 24224,
timeout: 3.0,
reconnectInterval: 600000,
requireAckResponse: false,
inputStructure: "log-key-string",
timeOffset: 0,
batchSend: false,
logKey: "log",
addKeys: {},
}
/*
Here are some keys we might want to add with addKeys
{
source:"stdout",
container_id:"c61d13c68659b622a01d8c3825b0bc1186391119d47dbf864d9c3a65c3f2aa79",
container_name:"/distracted_bell"
}
*/
const forwardDatajet: IDatajet = {
name: "forward",
defaultConfig: defaultConfig,
createConfiguredDatajet: function (config: IDatajetConfig) {
let loggerInit = false;
return {
datajetTemplate: this,
transmitBatch: async (batch: Array<ILogData>) => {
if (!loggerInit) {
loggerInit = true;
logger.configure(config.tagPrefix, {
host: config.host,
port: config.port,
timeout: config.timeout,
reconnectInterval: config.reconnectInterval, // 10 minutes
requireAckResponse: config.requireAckResponse,
});
}
try {
let processedBatch: Array<ILogData>;
// Optimization for batch sends
if (config.inputStructure === "object" && Object.entries(config.addKeys).length === 0) {
processedBatch = batch;
}
else {
// Process logs
processedBatch = batch.map(log => {
let pl: object;
if (config.inputStructure === "object") {
pl = log;
}
else if (config.inputStructure === "log-key-json") {
pl = JSON.parse(log[config.logKey]);
}
else { /* default value, "log-key-string" */
pl = {
"log": log[config.logKey]
}
}
if (config.addKeys) {
pl = {
...config.addKeys,
...pl,
}
}
return pl;
});
}
const t = new Date();
t.setSeconds(t.getSeconds() + config.timeOffset);
// Send entire batch
if (config.batchSend) {
if (config.timeOffset !== 0) {
logger.emit(processedBatch, t);
return true;
}
logger.emit(processedBatch);
return true;
}
// Emit log with time offset
processedBatch.forEach((b) => {
if (config.timeOffset !== 0) {
logger.emit(b, t);
return;
}
// Emit log
logger.emit(b);
})
return true;
}
catch (e) {
console.log("Firelens datajet execution failure: ", e.message)
return false;
}
}
}
}
}
export default forwardDatajet;
function ecsObjectify(text: string) {
return {
source:"stdout",
log: text,
container_id:"c61d13c68659b622a01d8c3825b0bc1186391119d47dbf864d9c3a65c3f2aa79",
container_name:"/distracted_bell"
}
}