datajets/tcp-datajet.ts (68 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { IDatajet, ILogData } from "../core/ext-types.js"
import net from 'net'
/*
* Sends logs to Fluent Bit through tcp
*/
interface IDatajetConfig {
host: string,
port: number,
maxRetries: number,
tcpBufferLimit: number,
addNewline: boolean,
logKey: string,
}
const defaultConfig: IDatajetConfig = {
host: "127.0.0.1",
port: 5170,
maxRetries: 2,
tcpBufferLimit: 100_000_000, /* 100 Megabytes */
addNewline: false,
logKey: "log",
}
const tcpDatajet: IDatajet = {
name: "tcp",
defaultConfig: defaultConfig,
createConfiguredDatajet: function (config: IDatajetConfig, {
logger
}) {
/* lazy client creation */
let client: net.Socket = null;
let isPaused: boolean = false;
let isClosed: boolean = false;
const makeClient = () => {
if (client) {
client.destroy();
}
client = new net.Socket();
client.connect(config.port, config.host, function() {
logger.info(`Connected tcp ${config.host}:${config.port}`);
});
client.on('close', function() {
logger.info(`Connection closed ${config.host}:${config.port}`);
isClosed = true;
});
}
return {
datajetTemplate: this,
transmitBatch: async (batch: Array<ILogData>) => {
for (const log of batch) {
if (!client) {
makeClient();
}
/* client paused */
if (isPaused) {
/* still paused */
if (client.writableLength > config.tcpBufferLimit * 3/4) {
continue;
}
/* sent enough data to resume */
logger.info(`Resuming tcp datajet ${config.host}:${config.port}`);
isPaused = false;
}
/* check if client is closed */
if (isClosed) {
return false;
}
for (let r = 0; r < config.maxRetries + 1; ++r) {
try {
const content = (config.logKey) ? log[config.logKey] : JSON.stringify(log) + ((config.addNewline) ? "\n" : "");
client.write(content + '\n', (error) => {
if (error) {
console.log("Failed to write to tcp connection.");
}
});
/* check if client needs to be paused */
if (client.writableLength > config.tcpBufferLimit) {
logger.info(`Pausing tcp datajet ${config.host}:${config.port}`);
isPaused = true;
}
break;
}
catch (e) {
if (r < config.maxRetries) {
logger.info(`Failed to write to tcp connection. Re-establishing connection. Attempt ${r+1}.`);
makeClient();
}
else {
logger.info(`TCP connection failure to ${config.host}:${config.port}.`);
return false;
}
}
}
}
return true;
}
}
}
}
export default tcpDatajet;