wrappers/executors/fluent-bit-executor.ts (588 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { IWrapper } from "../../core/ext-types"
import { IBuiltStage, IBuiltStageWrapper } from "../../core/pipeline-types";
import winston from 'winston';
import { ChildProcess, exec, execSync, spawn } from "child_process";
import { resolve } from 'path';
import fs, { copyFile } from "fs";
import { readdir, stat } from 'fs/promises';
const WORKSPACE_PATH = "workspace/";
const WORKSPACE_NAME = "fluent-bit";
const FLUENT_REPO = "https://github.com/fluent/fluent-bit.git";
import mustache, { templateCache } from 'mustache';
import simpleGit from 'simple-git';
import { hash, timestamp, sleep } from "../../core/utils.js";
import { IPipelineSchema } from "../../core/rockefeller";
/*
* Fluent Bit Wrapper
* Sets up and tears down a fluent bit process
*/
const fullWorkspacePath = resolve(`./${WORKSPACE_PATH}`);
interface ICodeCommitReference {
repository?: string,
branch?: string,
commit?: string,
}
interface ICodeSource {
base: ICodeCommitReference,
cherryPicks: Array<ICodeCommitReference>,
mergeStrategy: string,
}
interface ICodeSourceLock {
baseCommit: string,
cherryPickedCommits: Array<string>
mergeStrategy: string,
}
interface IFluentBitWrapperConfig {
outputFolder: string,
codeSource: ICodeSource,
fluentConfigFile: string,
fluentLogTransports: Array<winston.transports.FileTransportOptions>,
fluentLogCountOccurrences: Array<string>,
awaitValidators: boolean,
grace: number, /* in seconds */
environmentVariables: {[key: string]: any},
managedTemporaryPaths: Array<string>,
managedTemporaryFiles: {[key: string]: string},
warmupTime: number,
}
interface IFluentLock {
sourceLock: ICodeSourceLock,
configLock: string,
}
const defaultCodeCommitReference: ICodeCommitReference = {
repository: "https://github.com/fluent/fluent-bit.git",
branch: "master",
}
const defaultCherryPickMergeStrategy = "ort";
const defaultCodeSource: ICodeSource = {
base: defaultCodeCommitReference,
cherryPicks: [],
mergeStrategy: "",
}
const defaultConfig: IFluentBitWrapperConfig = {
outputFolder: "Unfiled",
codeSource: defaultCodeSource,
fluentConfigFile: "data-public/fluent-config/fluent.conf",
awaitValidators: true,
environmentVariables: {},
grace: 10,
fluentLogTransports: [
{filename: `fluent-bit-${timestamp()}.log`, level: 'info'} /* supports file only right now */
],
fluentLogCountOccurrences: [
"warn",
"error",
],
managedTemporaryPaths: [],
managedTemporaryFiles: {},
warmupTime: 5,
}
const fluentBitWrapper: IWrapper = {
name: "fluent-bit-executor",
defaultConfig: defaultConfig,
modifySubschema: (subschema: IPipelineSchema) => subschema, /* modify subtree, potentially inserting other BuiltStageWrappers in subtree */
createConfiguredWrapper: function (config: IFluentBitWrapperConfig, {
logger,
localPipelineSchema,
workspaceRoot,
setManagedVariable,
variables,
}) {
let fluentBitChildProcess: ChildProcess;
let fluentBitProcessLogger: winston.Logger;
let fluentLogCounts: {[key: string]: number} = {};
let cacheFolder = `${workspaceRoot}/build-cache`;
let cacheExpiration = 864000; /* 1 day in seconds */
let loggerLogStd = (data: string, logger: winston.Logger) => {
const logs = data.toString().split("\n");
logs.filter(log => log.length > 0).forEach(log => {
config.fluentLogCountOccurrences.forEach((find) => {
let re = new RegExp(find);
if (re.test(log)) {
/* matched regex */
fluentLogCounts[find] = (fluentLogCounts[find] ?? 0) + 1;
}
});
logger.info(log);
});
}
let fluentLog = (data: string) => {
loggerLogStd(data, fluentBitProcessLogger);
}
return {
wrapperTemplate: this,
setup: async (root: IBuiltStage, subtree: IBuiltStage) => {
/* Config path */
const fluentConfigPath = resolve(config.fluentConfigFile);
/* Init fluent bit repo if needed */
const repoPath = `${workspaceRoot}/${WORKSPACE_NAME}`;
const fullRepoPath = resolve(repoPath);
const fluentConfigWorkspacePath = `${workspaceRoot}/fluent-config`
/* Make folders if needed */
if (!await directoryExists(workspaceRoot)) {
await directoryMake(workspaceRoot);
}
if (!await directoryExists(repoPath)) {
await directoryMake(fullRepoPath);
}
/* Clear workspace temp folder */
const tmpFolder = `${workspaceRoot}/tmp`;
if (await directoryExists(tmpFolder)) {
await directoryDelete(tmpFolder);
await directoryMake(tmpFolder);
}
else {
await directoryMake(tmpFolder);
}
setManagedVariable("workspaceTmp", resolve(tmpFolder));
/* Make managed temporary paths */
const tmpPathsRoot = resolve(tmpFolder, "paths");
await directoryMake(tmpPathsRoot);
const tmpPaths = await Promise.all(config.managedTemporaryPaths.map(async t => {
const path = resolve(tmpPathsRoot, t);
await directoryMake(path);
return [t, path];
}));
const tmpPathsObj = Object.fromEntries(tmpPaths);
setManagedVariable("temporaryPaths", tmpPathsObj);
/* Copy managed files */
const tmpFilesRoot = resolve(tmpFolder, "files");
const tmpFilesObj = Object.fromEntries(Object.entries(config.managedTemporaryFiles).map(([name, location]) => {
const tmpFilePath = resolve(tmpFilesRoot, name);
return [name, tmpFilePath];
}));
setManagedVariable("temporaryFiles", tmpFilesObj);
await directoryMake(tmpFilesRoot);
await Promise.all(Object.entries(config.managedTemporaryFiles).map(async ([name, location]) => {
/* aquire raw file data */
let fileRaw: string;
try {
fileRaw = await fileRead(resolve(location));
} catch (e) {
logger.error(`Unable to read file: ${location}`);
return false;
}
/* mustache render the files */
const fileBaked = mustache.render(fileRaw, variables);
/* output render */
const tmpFilePath = resolve(tmpFilesRoot, name);
await fileMake(tmpFilePath, fileBaked);
}));
/* Copy config to workspace */
let configTemplate;
try {
configTemplate = await fileRead(fluentConfigPath);
} catch (e) {
logger.error(`Unable to read file: ${fluentConfigPath}`);
return false;
}
/* Init repo if needed */
const git = simpleGit(fullRepoPath);
const isRepo = await (directoryExists(`${repoPath}/.git`));
if (!isRepo) {
await initializeRepo(git, FLUENT_REPO);
await git.raw(['config', `user.email`, `"firelens@amazon.com"`]);
await git.raw(['config', `user.name`, `"FireLens Datajet"`]);
}
await git.fetch();
/* Amend code source repository and branch references */
const amendCodeRepositoryAndBranchReference = async (ref: ICodeCommitReference) => ({
...defaultCodeCommitReference,
...ref,
});
const amendedCodeSourcePart: ICodeSource = {
base: await amendCodeRepositoryAndBranchReference(config.codeSource.base),
cherryPicks: (config.codeSource.cherryPicks) ? await Promise.all(config.codeSource.cherryPicks.map(amendCodeRepositoryAndBranchReference)) : [],
mergeStrategy: ((config.codeSource.cherryPicks?.length ?? 0) !== 0) ?
config.codeSource.mergeStrategy ?? defaultCherryPickMergeStrategy :
"", // no merge strategy for [] cherry picks list
}
/* Accumulate fetch list (for updated commits) */
interface ICodeFetch {
repository: string,
branch: string,
};
const removeDuplicates = (a: Array<ICodeFetch>) => {
return Array.from(new Set(a.map(ftch => JSON.stringify(ftch))))
.map(strParse => JSON.parse(strParse)) as Array<ICodeFetch>;
}
const fetches: Array<ICodeFetch> = removeDuplicates(
[
amendedCodeSourcePart.base,
...amendedCodeSourcePart.cherryPicks,
].map(ref => ({
repository: ref.repository,
branch: ref.branch,
}))
);
const fetchResults = fetches.map(async fetch => git.fetch(fetch.repository, fetch.branch));
await Promise.all(fetchResults);
/* Amend code source commit reference */
const amendCodeCommitReference = async (ref: ICodeCommitReference) => {
const amended = {
...defaultCodeCommitReference,
...ref,
};
if (!amended?.commit) {
const commitHash = await git.raw(['ls-remote', `${amended.repository}`, `${amended.branch}`]);
amended.commit = commitHash.split("\t")[0];
}
return amended;
};
const amendedCodeSource: ICodeSource = {
base: await amendCodeCommitReference(amendedCodeSourcePart.base),
cherryPicks: await Promise.all(amendedCodeSourcePart.cherryPicks.map(amendCodeCommitReference)),
mergeStrategy: (amendedCodeSourcePart.cherryPicks.length !== 0) ?
amendedCodeSourcePart.mergeStrategy ?? defaultCherryPickMergeStrategy :
"", // no merge strategy for [] cherry picks list
}
/* Source lock */
const sourceLock: ICodeSourceLock = {
baseCommit: amendedCodeSource.base.commit,
cherryPickedCommits: amendedCodeSource.cherryPicks.map(ref => ref.commit),
mergeStrategy: amendedCodeSource.mergeStrategy,
}
const sourceLockHash = hash(sourceLock);
/* Checkout branch */
const sourceLockedBranchName = `source-lock-${sourceLockHash}`;
let makeSourceBranch = false;
try {
/* git reset --hard to avoid gitignore induced problems */
await git.reset(["--hard"]);
await git.checkout(sourceLockedBranchName);
}
catch {
logger.info(`Creating new source locked branch: ${sourceLockedBranchName}`);
makeSourceBranch = true;
}
if (makeSourceBranch) {
/* Make branch */
await git.checkout(["-b", sourceLockedBranchName, sourceLock.baseCommit]);
try {
/* Cherry picks */
for (const cherry of sourceLock.cherryPickedCommits) {
await git.raw(['cherry-pick', `--strategy=${sourceLock.mergeStrategy}`, cherry]);
}
}
catch (e) {
/* Cherry picks fail - delete branch */
await git.reset(["--hard"]);
await git.checkout("-");
await git.deleteLocalBranch(sourceLockedBranchName, true);
logger.error(`Cherry picks failed. Deleted failed sourceLockedBranch ${sourceLockedBranchName}`);
throw e;
}
}
/* Create fluent-lock.json */
const fluentLock: IFluentLock = {
sourceLock: sourceLock,
configLock: configTemplate,
}
const fluentLockHash = hash(fluentLock);
/* Write source records */
const outputParentPath = resolve("./output");
if (!await directoryExists(outputParentPath)) {
await directoryMake(outputParentPath);
}
const outputPath = resolve(`${outputParentPath}/${config.outputFolder}`);
if (!await directoryExists(outputPath)) {
await directoryMake(outputPath);
}
const outputFluentLockedPath = `${outputPath}/fluent-lock-${fluentLockHash}`;
if (!await directoryExists(outputFluentLockedPath)) {
await directoryMake(outputFluentLockedPath);
}
const outputFluentLockedPathCurrentSoftLink = `${outputPath}/current`
if (await directoryExists(outputFluentLockedPathCurrentSoftLink)) {
await directoryDelete(outputFluentLockedPathCurrentSoftLink);
}
await softLinkMake(outputFluentLockedPath, outputFluentLockedPathCurrentSoftLink);
setManagedVariable("outputPath", outputFluentLockedPath);
const fluentLockFilePath = `${outputFluentLockedPath}/fluent-lock.json`;
if (!await fileExists(fluentLockFilePath)) {
await fileMake(fluentLockFilePath, JSON.stringify(fluentLock, null, 2));
}
const sourceLockInfoFilePath = `${outputFluentLockedPath}/source-lock-info.json`;
const sourceLockInfo = {
sourceLockBranch: sourceLockedBranchName,
sourceLockHash: sourceLockHash,
sourceLock: sourceLock,
}
if (!await fileExists(sourceLockInfoFilePath)) {
await fileMake(sourceLockInfoFilePath, JSON.stringify(sourceLockInfo, null, 2));
}
const fluentConfigFilePath = `${outputFluentLockedPath}/fluent-bit-template.conf`
if (!await fileExists(fluentConfigFilePath)) {
await fileMake(fluentConfigFilePath, configTemplate);
}
/* Write test records */
const outputTestPath = `${outputFluentLockedPath}/test-${timestamp()}`;
if (!await directoryExists(outputTestPath)) {
await directoryMake(outputTestPath);
}
const outputTestPathLatestSoftLink = `${outputFluentLockedPath}/latest`
if (await directoryExists(outputTestPathLatestSoftLink)) {
await directoryDelete(outputTestPathLatestSoftLink);
}
await softLinkMake(outputTestPath, outputTestPathLatestSoftLink);
setManagedVariable("testPath", outputTestPath);
const testByproductPath = `${outputTestPath}/byproduct`;
if (!await directoryExists(testByproductPath)) {
await directoryMake(testByproductPath);
}
setManagedVariable("testByproductPath", testByproductPath);
const fluentPipelineSchemaFilePath = `${outputTestPath}/test-pipeline-schema.json`
if (!await fileExists(fluentPipelineSchemaFilePath)) {
await fileMake(fluentPipelineSchemaFilePath, JSON.stringify(localPipelineSchema, null, 2));
}
const fluentFullSourcePath = `${outputTestPath}/source.json`
if (!await fileExists(fluentFullSourcePath)) {
await fileMake(fluentFullSourcePath, JSON.stringify(amendedCodeSource, null, 2));
}
const outputInstrumentationPath = `${outputTestPath}/instrumentation`;
if (!await directoryExists(outputInstrumentationPath)) {
await directoryMake(outputInstrumentationPath);
}
/* Render baked config file (now that all managed variables are set) */
const configBaked = mustache.render(configTemplate, variables);
const configId = hash(configBaked);
if (!await directoryExists(fluentConfigWorkspacePath)) {
await directoryMake(fluentConfigWorkspacePath)
}
const configBakedPath = `${fluentConfigWorkspacePath}/${configId}.conf`;
if (!await fileExists(configBakedPath)) {
await fileMake(configBakedPath, configBaked);
}
const fluentBakedConfigFilePath = `${outputTestPath}/fluent-bit.conf`
if (!await fileExists(fluentBakedConfigFilePath)) {
await fileMake(fluentBakedConfigFilePath, configBaked);
}
/* Configure Fluent Bit, make, and cmake loggers */
const outputLogPath = `${outputTestPath}/logs`;
if (!await directoryExists(outputLogPath)) {
await directoryMake(outputLogPath);
}
fluentBitProcessLogger = winston.createLogger({
level: 'info',
defaultMeta: { service: 'fluent-bit' },
transports: config.fluentLogTransports.map(transport => new winston.transports.File({
...transport,
filename: `${outputLogPath}/${transport?.filename ?? "fluent-bit.log"}`,
})),
});
/* Manage cache */
if (!await directoryExists(cacheFolder)) {
await directoryMake(cacheFolder);
}
await manageCacheExpirations(logger, cacheFolder, cacheExpiration);
/* Recover from cache */
const executableWorkspacePath = `${workspaceRoot}/tmp/executable`;
if (!await directoryExists(executableWorkspacePath)) {
await directoryMake(executableWorkspacePath);
}
const isRecovered = await recoverFromCache(sourceLockHash, executableWorkspacePath);
if (!isRecovered) {
/* Build Fluent Bit */
await buildFluentBit(sourceLockHash, outputLogPath, fullRepoPath, executableWorkspacePath);
}
/* Copy to byproduct path */
await fileCopy(`${executableWorkspacePath}/fluent-bit`, `${testByproductPath}/fluent-bit`);
/* Run Fluent Bit */
logger.info("Running Fluent Bit.");
fluentBitChildProcess = spawn(`ulimit -c unlimited; exec ./fluent-bit -c '${fluentBakedConfigFilePath}'`, {
cwd: `${executableWorkspacePath}`,
env: {
...process.env,
...config.environmentVariables,
"FLB_INSTRUMENTATION_OUT_PATH": outputInstrumentationPath,
},
shell: true
});
fluentBitChildProcess.stdout.on('data', (data) => {
fluentLog(data);
});
fluentBitChildProcess.stderr.on('data', (data) => {
fluentLog(data);
});
fluentBitChildProcess.on('error', (error) => logger.error(`Fluent Bit Process error: ${error.message}`))
await sleep(config.warmupTime);
return true;
},
validation: async (root: IBuiltStage, subtree: IBuiltStage) => {
const signalHandler = async () => {
return new Promise((promiseResolve) => {
/* request shutdown: SigTerm */
logger.info("Fluent Bit signalled");
fluentBitChildProcess.kill('SIGTERM');
/* terminate after grace: SigInt */
let graceTimer = setTimeout(() => {
fluentBitChildProcess.kill('SIGKILL');
logger.info("Fluent Bit killed");
graceTimer = null;
}, config.grace * 1000);
/* handle exit */
fluentBitChildProcess.on("close", () => {
if (graceTimer) {
clearTimeout(graceTimer);
}
logger.info("Fluent Bit exited (stopped)");
promiseResolve(null);
});
});
};
await signalHandler();
return {
isValidationSuccess: true,
// Other data can be added here for validation metric collection.
validationData: {
fluentLogCounts: fluentLogCounts,
/*fluentBitProcessLogger: fluentBitProcessLogger, */
},
/* May want to add hidden validation data */
};
},
breakdown: async (root: IBuiltStage, subtree: IBuiltStage) => {
return true;
},
isValidationAsync: false,
};
/* A method to build Fluent Bit. Executable sent to destination folder */
async function buildFluentBit(
cacheKey: string,
outputLogPath: string,
fullRepoPath: string,
executableDestination: string
) {
const cmakeProcessLogger = winston.createLogger({
level: 'info',
transports: config.fluentLogTransports.map(transport => new winston.transports.File({
filename: `${outputLogPath}/cmake.log`,
})),
});
const makeProcessLogger = winston.createLogger({
level: 'info',
transports: config.fluentLogTransports.map(transport => new winston.transports.File({
filename: `${outputLogPath}/make.log`,
})),
});
/* CMake & make build */
logger.info("👷 Building Fluent Bit. Stand by...");
let buildFailed = false;
try {
/* CMake */
await execChildAsyncWrapper((() => {
const childProcess = exec("cmake ..", {cwd: `${fullRepoPath}/build`}, (error, stdout, stderr) => {
if (error) {
logger.error("CMake failed");
loggerLogStd(error.message, cmakeProcessLogger);
// cmakeProcessLogger.info(`error: ${error.message}\n`);
buildFailed = true;
return;
}
if (stderr) {
loggerLogStd(stderr, cmakeProcessLogger);
// cmakeProcessLogger.info(`stderr: ${stderr}\n`);
return;
}
loggerLogStd(stdout, cmakeProcessLogger);
});
return childProcess;
})());
/* Make */
await execChildAsyncWrapper((() => {
const childProcess = exec("make", {cwd: `${fullRepoPath}/build`}, (error, stdout, stderr) => {
if (error) {
logger.error("Make failed");
loggerLogStd(error.message, makeProcessLogger);
buildFailed = true;
childProcess.kill();
return;
}
if (stderr) {
loggerLogStd(stderr, makeProcessLogger);
return;
}
loggerLogStd(stdout, makeProcessLogger);
});
return childProcess
})());
} catch (e) {
buildFailed = true;
}
if (buildFailed) {
logger.error("Build failed.");
return false;
}
logger.info("Build succeeded.");
/* Archive Fluent Bit executable */
await fileCopy(`${fullRepoPath}/build/bin/fluent-bit`, `${executableDestination}/fluent-bit`);
/* Cache Fluent Bit executable */
await fileCopy(`${fullRepoPath}/build/bin/fluent-bit`, `${cacheFolder}/${cacheKey}`);
}
async function recoverFromCache(cacheKey: string, executableDestination: string) {
if (await fileExists(`${cacheFolder}/${cacheKey}`)) {
logger.info("Recovered build from cache 🐚");
await fileCopy(`${cacheFolder}/${cacheKey}`, `${executableDestination}/fluent-bit`);
return true;
}
return false;
}
}
}
function initializeRepo(git, url: string) {
return git.init()
.then(() => git.addRemote('origin', url))
}
async function execChildAsyncWrapper(childProcess: ChildProcess) {
return new Promise((resolve, reject) => {
childProcess.addListener("error", reject);
childProcess.addListener("exit", resolve);
});
}
async function directoryExists(path: string) {
return new Promise((resolve, reject) => {
fs.access(path, function(error) {
if (error) {
resolve(false);
} else {
resolve(true);
}
});
});
}
async function directoryMake(path: string) {
return new Promise((resolve, reject) => {
fs.mkdir(path, function(err) {
if (err) {
reject(err);
} else resolve(null);
});
})
}
async function directoryDelete(path: string) {
return new Promise((resolve, reject) => {
fs.rm(path, { recursive: true }, (err) => {
if (err) {
reject(err);
} else resolve(null);
});
})
}
async function softLinkMake(linkTo: string, path: string) {
return new Promise((resolve, reject) => {
fs.symlink(linkTo, path, function(err) {
if (err) {
reject(err);
} else resolve(null);
});
})
}
/* Delete all cached executables older than cacheExpiration */
async function manageCacheExpirations(logger: winston.Logger, cacheFolder: string, cacheExpiration: number) {
const timeNow = Date.now();
const cacheFiles = (await Promise.all((await readdir(cacheFolder, { withFileTypes: true }))
.filter(dirent => dirent.isFile)
.map(async dirent => {
const path = `${cacheFolder}/${dirent.name}`;
const fstat = await stat(path);
return {
ctimeMs: fstat.ctimeMs,
path: path,
}
})));
await (Promise.all(
cacheFiles
.filter(f => (timeNow - f.ctimeMs) > cacheExpiration * 1000)
.map(async f => {
logger.info(`🐚 Cache expired after ${cacheExpiration} seconds.`
+ ` Total lifespan of ${Math.floor(timeNow - f.ctimeMs)} seconds`);
await fileDelete(f.path);
})
));
}
async function fileRead(path: string): Promise<string> {
return new Promise((resolve, reject) => {
fs.readFile(path, (err, buff) => {
if (err) {
return reject(err);
}
resolve(buff.toString());
});
})
}
async function fileExists(path: string) {
return directoryExists(path);
}
async function fileMake(path: string, contents: string) {
return new Promise((resolve, reject) => {
fs.writeFile(path, contents, function (err) {
if (err) {
reject(err);
}
resolve(null);
})
})
}
async function fileDelete(path: string) {
return directoryDelete(path);
}
async function fileCopy(source: string, destination: string) {
return new Promise((resolve, reject) => {
fs.copyFile(source, destination, function(err) {
if (err) {
reject(err);
} else resolve(null);
});
})
}
export default fluentBitWrapper;