in wrappers/executors/fluent-bit-executor.ts [139:467]
setup: async (root: IBuiltStage, subtree: IBuiltStage) => {
/* Config path */
const fluentConfigPath = resolve(config.fluentConfigFile);
/* Init fluent bit repo if needed */
const repoPath = `${workspaceRoot}/${WORKSPACE_NAME}`;
const fullRepoPath = resolve(repoPath);
const fluentConfigWorkspacePath = `${workspaceRoot}/fluent-config`
/* Make folders if needed */
if (!await directoryExists(workspaceRoot)) {
await directoryMake(workspaceRoot);
}
if (!await directoryExists(repoPath)) {
await directoryMake(fullRepoPath);
}
/* Clear workspace temp folder */
const tmpFolder = `${workspaceRoot}/tmp`;
if (await directoryExists(tmpFolder)) {
await directoryDelete(tmpFolder);
await directoryMake(tmpFolder);
}
else {
await directoryMake(tmpFolder);
}
setManagedVariable("workspaceTmp", resolve(tmpFolder));
/* Make managed temporary paths */
const tmpPathsRoot = resolve(tmpFolder, "paths");
await directoryMake(tmpPathsRoot);
const tmpPaths = await Promise.all(config.managedTemporaryPaths.map(async t => {
const path = resolve(tmpPathsRoot, t);
await directoryMake(path);
return [t, path];
}));
const tmpPathsObj = Object.fromEntries(tmpPaths);
setManagedVariable("temporaryPaths", tmpPathsObj);
/* Copy managed files */
const tmpFilesRoot = resolve(tmpFolder, "files");
const tmpFilesObj = Object.fromEntries(Object.entries(config.managedTemporaryFiles).map(([name, location]) => {
const tmpFilePath = resolve(tmpFilesRoot, name);
return [name, tmpFilePath];
}));
setManagedVariable("temporaryFiles", tmpFilesObj);
await directoryMake(tmpFilesRoot);
await Promise.all(Object.entries(config.managedTemporaryFiles).map(async ([name, location]) => {
/* aquire raw file data */
let fileRaw: string;
try {
fileRaw = await fileRead(resolve(location));
} catch (e) {
logger.error(`Unable to read file: ${location}`);
return false;
}
/* mustache render the files */
const fileBaked = mustache.render(fileRaw, variables);
/* output render */
const tmpFilePath = resolve(tmpFilesRoot, name);
await fileMake(tmpFilePath, fileBaked);
}));
/* Copy config to workspace */
let configTemplate;
try {
configTemplate = await fileRead(fluentConfigPath);
} catch (e) {
logger.error(`Unable to read file: ${fluentConfigPath}`);
return false;
}
/* Init repo if needed */
const git = simpleGit(fullRepoPath);
const isRepo = await (directoryExists(`${repoPath}/.git`));
if (!isRepo) {
await initializeRepo(git, FLUENT_REPO);
await git.raw(['config', `user.email`, `"firelens@amazon.com"`]);
await git.raw(['config', `user.name`, `"FireLens Datajet"`]);
}
await git.fetch();
/* Amend code source repository and branch references */
const amendCodeRepositoryAndBranchReference = async (ref: ICodeCommitReference) => ({
...defaultCodeCommitReference,
...ref,
});
const amendedCodeSourcePart: ICodeSource = {
base: await amendCodeRepositoryAndBranchReference(config.codeSource.base),
cherryPicks: (config.codeSource.cherryPicks) ? await Promise.all(config.codeSource.cherryPicks.map(amendCodeRepositoryAndBranchReference)) : [],
mergeStrategy: ((config.codeSource.cherryPicks?.length ?? 0) !== 0) ?
config.codeSource.mergeStrategy ?? defaultCherryPickMergeStrategy :
"", // no merge strategy for [] cherry picks list
}
/* Accumulate fetch list (for updated commits) */
interface ICodeFetch {
repository: string,
branch: string,
};
const removeDuplicates = (a: Array<ICodeFetch>) => {
return Array.from(new Set(a.map(ftch => JSON.stringify(ftch))))
.map(strParse => JSON.parse(strParse)) as Array<ICodeFetch>;
}
const fetches: Array<ICodeFetch> = removeDuplicates(
[
amendedCodeSourcePart.base,
...amendedCodeSourcePart.cherryPicks,
].map(ref => ({
repository: ref.repository,
branch: ref.branch,
}))
);
const fetchResults = fetches.map(async fetch => git.fetch(fetch.repository, fetch.branch));
await Promise.all(fetchResults);
/* Amend code source commit reference */
const amendCodeCommitReference = async (ref: ICodeCommitReference) => {
const amended = {
...defaultCodeCommitReference,
...ref,
};
if (!amended?.commit) {
const commitHash = await git.raw(['ls-remote', `${amended.repository}`, `${amended.branch}`]);
amended.commit = commitHash.split("\t")[0];
}
return amended;
};
const amendedCodeSource: ICodeSource = {
base: await amendCodeCommitReference(amendedCodeSourcePart.base),
cherryPicks: await Promise.all(amendedCodeSourcePart.cherryPicks.map(amendCodeCommitReference)),
mergeStrategy: (amendedCodeSourcePart.cherryPicks.length !== 0) ?
amendedCodeSourcePart.mergeStrategy ?? defaultCherryPickMergeStrategy :
"", // no merge strategy for [] cherry picks list
}
/* Source lock */
const sourceLock: ICodeSourceLock = {
baseCommit: amendedCodeSource.base.commit,
cherryPickedCommits: amendedCodeSource.cherryPicks.map(ref => ref.commit),
mergeStrategy: amendedCodeSource.mergeStrategy,
}
const sourceLockHash = hash(sourceLock);
/* Checkout branch */
const sourceLockedBranchName = `source-lock-${sourceLockHash}`;
let makeSourceBranch = false;
try {
/* git reset --hard to avoid gitignore induced problems */
await git.reset(["--hard"]);
await git.checkout(sourceLockedBranchName);
}
catch {
logger.info(`Creating new source locked branch: ${sourceLockedBranchName}`);
makeSourceBranch = true;
}
if (makeSourceBranch) {
/* Make branch */
await git.checkout(["-b", sourceLockedBranchName, sourceLock.baseCommit]);
try {
/* Cherry picks */
for (const cherry of sourceLock.cherryPickedCommits) {
await git.raw(['cherry-pick', `--strategy=${sourceLock.mergeStrategy}`, cherry]);
}
}
catch (e) {
/* Cherry picks fail - delete branch */
await git.reset(["--hard"]);
await git.checkout("-");
await git.deleteLocalBranch(sourceLockedBranchName, true);
logger.error(`Cherry picks failed. Deleted failed sourceLockedBranch ${sourceLockedBranchName}`);
throw e;
}
}
/* Create fluent-lock.json */
const fluentLock: IFluentLock = {
sourceLock: sourceLock,
configLock: configTemplate,
}
const fluentLockHash = hash(fluentLock);
/* Write source records */
const outputParentPath = resolve("./output");
if (!await directoryExists(outputParentPath)) {
await directoryMake(outputParentPath);
}
const outputPath = resolve(`${outputParentPath}/${config.outputFolder}`);
if (!await directoryExists(outputPath)) {
await directoryMake(outputPath);
}
const outputFluentLockedPath = `${outputPath}/fluent-lock-${fluentLockHash}`;
if (!await directoryExists(outputFluentLockedPath)) {
await directoryMake(outputFluentLockedPath);
}
const outputFluentLockedPathCurrentSoftLink = `${outputPath}/current`
if (await directoryExists(outputFluentLockedPathCurrentSoftLink)) {
await directoryDelete(outputFluentLockedPathCurrentSoftLink);
}
await softLinkMake(outputFluentLockedPath, outputFluentLockedPathCurrentSoftLink);
setManagedVariable("outputPath", outputFluentLockedPath);
const fluentLockFilePath = `${outputFluentLockedPath}/fluent-lock.json`;
if (!await fileExists(fluentLockFilePath)) {
await fileMake(fluentLockFilePath, JSON.stringify(fluentLock, null, 2));
}
const sourceLockInfoFilePath = `${outputFluentLockedPath}/source-lock-info.json`;
const sourceLockInfo = {
sourceLockBranch: sourceLockedBranchName,
sourceLockHash: sourceLockHash,
sourceLock: sourceLock,
}
if (!await fileExists(sourceLockInfoFilePath)) {
await fileMake(sourceLockInfoFilePath, JSON.stringify(sourceLockInfo, null, 2));
}
const fluentConfigFilePath = `${outputFluentLockedPath}/fluent-bit-template.conf`
if (!await fileExists(fluentConfigFilePath)) {
await fileMake(fluentConfigFilePath, configTemplate);
}
/* Write test records */
const outputTestPath = `${outputFluentLockedPath}/test-${timestamp()}`;
if (!await directoryExists(outputTestPath)) {
await directoryMake(outputTestPath);
}
const outputTestPathLatestSoftLink = `${outputFluentLockedPath}/latest`
if (await directoryExists(outputTestPathLatestSoftLink)) {
await directoryDelete(outputTestPathLatestSoftLink);
}
await softLinkMake(outputTestPath, outputTestPathLatestSoftLink);
setManagedVariable("testPath", outputTestPath);
const testByproductPath = `${outputTestPath}/byproduct`;
if (!await directoryExists(testByproductPath)) {
await directoryMake(testByproductPath);
}
setManagedVariable("testByproductPath", testByproductPath);
const fluentPipelineSchemaFilePath = `${outputTestPath}/test-pipeline-schema.json`
if (!await fileExists(fluentPipelineSchemaFilePath)) {
await fileMake(fluentPipelineSchemaFilePath, JSON.stringify(localPipelineSchema, null, 2));
}
const fluentFullSourcePath = `${outputTestPath}/source.json`
if (!await fileExists(fluentFullSourcePath)) {
await fileMake(fluentFullSourcePath, JSON.stringify(amendedCodeSource, null, 2));
}
const outputInstrumentationPath = `${outputTestPath}/instrumentation`;
if (!await directoryExists(outputInstrumentationPath)) {
await directoryMake(outputInstrumentationPath);
}
/* Render baked config file (now that all managed variables are set) */
const configBaked = mustache.render(configTemplate, variables);
const configId = hash(configBaked);
if (!await directoryExists(fluentConfigWorkspacePath)) {
await directoryMake(fluentConfigWorkspacePath)
}
const configBakedPath = `${fluentConfigWorkspacePath}/${configId}.conf`;
if (!await fileExists(configBakedPath)) {
await fileMake(configBakedPath, configBaked);
}
const fluentBakedConfigFilePath = `${outputTestPath}/fluent-bit.conf`
if (!await fileExists(fluentBakedConfigFilePath)) {
await fileMake(fluentBakedConfigFilePath, configBaked);
}
/* Configure Fluent Bit, make, and cmake loggers */
const outputLogPath = `${outputTestPath}/logs`;
if (!await directoryExists(outputLogPath)) {
await directoryMake(outputLogPath);
}
fluentBitProcessLogger = winston.createLogger({
level: 'info',
defaultMeta: { service: 'fluent-bit' },
transports: config.fluentLogTransports.map(transport => new winston.transports.File({
...transport,
filename: `${outputLogPath}/${transport?.filename ?? "fluent-bit.log"}`,
})),
});
/* Manage cache */
if (!await directoryExists(cacheFolder)) {
await directoryMake(cacheFolder);
}
await manageCacheExpirations(logger, cacheFolder, cacheExpiration);
/* Recover from cache */
const executableWorkspacePath = `${workspaceRoot}/tmp/executable`;
if (!await directoryExists(executableWorkspacePath)) {
await directoryMake(executableWorkspacePath);
}
const isRecovered = await recoverFromCache(sourceLockHash, executableWorkspacePath);
if (!isRecovered) {
/* Build Fluent Bit */
await buildFluentBit(sourceLockHash, outputLogPath, fullRepoPath, executableWorkspacePath);
}
/* Copy to byproduct path */
await fileCopy(`${executableWorkspacePath}/fluent-bit`, `${testByproductPath}/fluent-bit`);
/* Run Fluent Bit */
logger.info("Running Fluent Bit.");
fluentBitChildProcess = spawn(`ulimit -c unlimited; exec ./fluent-bit -c '${fluentBakedConfigFilePath}'`, {
cwd: `${executableWorkspacePath}`,
env: {
...process.env,
...config.environmentVariables,
"FLB_INSTRUMENTATION_OUT_PATH": outputInstrumentationPath,
},
shell: true
});
fluentBitChildProcess.stdout.on('data', (data) => {
fluentLog(data);
});
fluentBitChildProcess.stderr.on('data', (data) => {
fluentLog(data);
});
fluentBitChildProcess.on('error', (error) => logger.error(`Fluent Bit Process error: ${error.message}`))
await sleep(config.warmupTime);
return true;
},