in datajets/forward-datajet.ts [56:131]
transmitBatch: async (batch: Array<ILogData>) => {
if (!loggerInit) {
loggerInit = true;
logger.configure(config.tagPrefix, {
host: config.host,
port: config.port,
timeout: config.timeout,
reconnectInterval: config.reconnectInterval, // 10 minutes
requireAckResponse: config.requireAckResponse,
});
}
try {
let processedBatch: Array<ILogData>;
// Optimization for batch sends
if (config.inputStructure === "object" && Object.entries(config.addKeys).length === 0) {
processedBatch = batch;
}
else {
// Process logs
processedBatch = batch.map(log => {
let pl: object;
if (config.inputStructure === "object") {
pl = log;
}
else if (config.inputStructure === "log-key-json") {
pl = JSON.parse(log[config.logKey]);
}
else { /* default value, "log-key-string" */
pl = {
"log": log[config.logKey]
}
}
if (config.addKeys) {
pl = {
...config.addKeys,
...pl,
}
}
return pl;
});
}
const t = new Date();
t.setSeconds(t.getSeconds() + config.timeOffset);
// Send entire batch
if (config.batchSend) {
if (config.timeOffset !== 0) {
logger.emit(processedBatch, t);
return true;
}
logger.emit(processedBatch);
return true;
}
// Emit log with time offset
processedBatch.forEach((b) => {
if (config.timeOffset !== 0) {
logger.emit(b, t);
return;
}
// Emit log
logger.emit(b);
})
return true;
}
catch (e) {
console.log("Firelens datajet execution failure: ", e.message)
return false;
}
}