setup: async()

in wrappers/executors/fluent-bit-executor.ts [139:467]


            setup: async (root: IBuiltStage, subtree: IBuiltStage) => {

                /* Config path */
                const fluentConfigPath = resolve(config.fluentConfigFile);

                /* Init fluent bit repo if needed */
                const repoPath = `${workspaceRoot}/${WORKSPACE_NAME}`;
                const fullRepoPath = resolve(repoPath);
                const fluentConfigWorkspacePath = `${workspaceRoot}/fluent-config`

                /* Make folders if needed */
                if (!await directoryExists(workspaceRoot)) {
                    await directoryMake(workspaceRoot);
                }
                if (!await directoryExists(repoPath)) {
                    await directoryMake(fullRepoPath);
                }

                /* Clear workspace temp folder */
                const tmpFolder = `${workspaceRoot}/tmp`;
                if (await directoryExists(tmpFolder)) {
                    await directoryDelete(tmpFolder);
                    await directoryMake(tmpFolder);
                }
                else {
                    await directoryMake(tmpFolder);
                }
                setManagedVariable("workspaceTmp", resolve(tmpFolder));

                /* Make managed temporary paths */
                const tmpPathsRoot = resolve(tmpFolder, "paths");
                await directoryMake(tmpPathsRoot);
                const tmpPaths = await Promise.all(config.managedTemporaryPaths.map(async t => {
                    const path = resolve(tmpPathsRoot, t);
                    await directoryMake(path);
                    return [t, path];
                }));
                const tmpPathsObj = Object.fromEntries(tmpPaths);
                setManagedVariable("temporaryPaths", tmpPathsObj);

                /* Copy managed files */
                const tmpFilesRoot = resolve(tmpFolder, "files");
                const tmpFilesObj = Object.fromEntries(Object.entries(config.managedTemporaryFiles).map(([name, location]) => {
                    const tmpFilePath = resolve(tmpFilesRoot, name);
                    return [name, tmpFilePath];
                }));
                setManagedVariable("temporaryFiles", tmpFilesObj);

                await directoryMake(tmpFilesRoot);
                await Promise.all(Object.entries(config.managedTemporaryFiles).map(async ([name, location]) => {

                    /* aquire raw file data */
                    let fileRaw: string;
                    try {
                        fileRaw = await fileRead(resolve(location));
                    } catch (e) {
                        logger.error(`Unable to read file: ${location}`);
                        return false;
                    }

                    /* mustache render the files */
                    const fileBaked = mustache.render(fileRaw, variables);

                    /* output render */
                    const tmpFilePath = resolve(tmpFilesRoot, name);
                    await fileMake(tmpFilePath, fileBaked);
                }));

                /* Copy config to workspace */
                let configTemplate;
                try {
                    configTemplate = await fileRead(fluentConfigPath);
                } catch (e) {
                    logger.error(`Unable to read file: ${fluentConfigPath}`);
                    return false;
                }

                /* Init repo if needed */
                const git = simpleGit(fullRepoPath);
                const isRepo = await (directoryExists(`${repoPath}/.git`));
                if (!isRepo) {
                    await initializeRepo(git, FLUENT_REPO);
                    await git.raw(['config', `user.email`, `"firelens@amazon.com"`]);
                    await git.raw(['config', `user.name`, `"FireLens Datajet"`]);
                }
                await git.fetch();

                /* Amend code source repository and branch references */
                const amendCodeRepositoryAndBranchReference = async (ref: ICodeCommitReference) => ({
                    ...defaultCodeCommitReference,
                    ...ref,
                });
                const amendedCodeSourcePart: ICodeSource = {
                    base: await amendCodeRepositoryAndBranchReference(config.codeSource.base),
                    cherryPicks: (config.codeSource.cherryPicks) ? await Promise.all(config.codeSource.cherryPicks.map(amendCodeRepositoryAndBranchReference)) : [],
                    mergeStrategy: ((config.codeSource.cherryPicks?.length ?? 0) !== 0) ?
                        config.codeSource.mergeStrategy ?? defaultCherryPickMergeStrategy :
                        "", // no merge strategy for [] cherry picks list
                }

                /* Accumulate fetch list (for updated commits) */
                interface ICodeFetch {
                    repository: string,
                    branch: string,
                };
                const removeDuplicates = (a: Array<ICodeFetch>) => {
                    return Array.from(new Set(a.map(ftch => JSON.stringify(ftch))))
                        .map(strParse => JSON.parse(strParse)) as Array<ICodeFetch>;
                }
                const fetches: Array<ICodeFetch> = removeDuplicates(
                    [
                        amendedCodeSourcePart.base,
                        ...amendedCodeSourcePart.cherryPicks,
                    ].map(ref => ({
                        repository: ref.repository,
                        branch: ref.branch,
                    }))
                );

                const fetchResults = fetches.map(async fetch => git.fetch(fetch.repository, fetch.branch));
                await Promise.all(fetchResults);

                /* Amend code source commit reference */
                const amendCodeCommitReference = async (ref: ICodeCommitReference) => {
                    const amended = {
                        ...defaultCodeCommitReference,
                        ...ref,
                    };
                    if (!amended?.commit) {
                        const commitHash = await git.raw(['ls-remote', `${amended.repository}`, `${amended.branch}`]);
                        amended.commit = commitHash.split("\t")[0];
                    }
                    return amended;
                };
                const amendedCodeSource: ICodeSource = {
                    base: await amendCodeCommitReference(amendedCodeSourcePart.base),
                    cherryPicks: await Promise.all(amendedCodeSourcePart.cherryPicks.map(amendCodeCommitReference)),
                    mergeStrategy: (amendedCodeSourcePart.cherryPicks.length !== 0) ?
                        amendedCodeSourcePart.mergeStrategy ?? defaultCherryPickMergeStrategy :
                        "", // no merge strategy for [] cherry picks list
                }

                /* Source lock */
                const sourceLock: ICodeSourceLock = {
                    baseCommit: amendedCodeSource.base.commit,
                    cherryPickedCommits: amendedCodeSource.cherryPicks.map(ref => ref.commit),
                    mergeStrategy: amendedCodeSource.mergeStrategy,
                }
                const sourceLockHash = hash(sourceLock);

                /* Checkout branch */
                const sourceLockedBranchName = `source-lock-${sourceLockHash}`;
                let makeSourceBranch = false;
                try {
                    /* git reset --hard to avoid gitignore induced problems */
                    await git.reset(["--hard"]); 
                    await git.checkout(sourceLockedBranchName);
                }
                catch {
                    logger.info(`Creating new source locked branch: ${sourceLockedBranchName}`);
                    makeSourceBranch = true;
                }

                if (makeSourceBranch) {

                    /* Make branch */
                    await git.checkout(["-b", sourceLockedBranchName, sourceLock.baseCommit]);
                    
                    try {
                        /* Cherry picks */
                        for (const cherry of sourceLock.cherryPickedCommits) {
                            await git.raw(['cherry-pick', `--strategy=${sourceLock.mergeStrategy}`, cherry]);
                        }
                    }
                    catch (e) {
                        /* Cherry picks fail - delete branch */
                        await git.reset(["--hard"]);
                        await git.checkout("-");
                        await git.deleteLocalBranch(sourceLockedBranchName, true);
                        logger.error(`Cherry picks failed. Deleted failed sourceLockedBranch ${sourceLockedBranchName}`);
                        throw e;
                    }
                }

                /* Create fluent-lock.json */
                const fluentLock: IFluentLock = {
                    sourceLock: sourceLock,
                    configLock: configTemplate,
                }
                const fluentLockHash = hash(fluentLock);

                /* Write source records */
                const outputParentPath = resolve("./output");
                if (!await directoryExists(outputParentPath)) {
                    await directoryMake(outputParentPath);
                }
                const outputPath = resolve(`${outputParentPath}/${config.outputFolder}`);
                if (!await directoryExists(outputPath)) {
                    await directoryMake(outputPath);
                }
                const outputFluentLockedPath = `${outputPath}/fluent-lock-${fluentLockHash}`;
                if (!await directoryExists(outputFluentLockedPath)) {
                    await directoryMake(outputFluentLockedPath);
                }
                const outputFluentLockedPathCurrentSoftLink = `${outputPath}/current`
                if (await directoryExists(outputFluentLockedPathCurrentSoftLink)) {
                    await directoryDelete(outputFluentLockedPathCurrentSoftLink);
                }
                await softLinkMake(outputFluentLockedPath, outputFluentLockedPathCurrentSoftLink);
                setManagedVariable("outputPath", outputFluentLockedPath);
                const fluentLockFilePath = `${outputFluentLockedPath}/fluent-lock.json`;
                if (!await fileExists(fluentLockFilePath)) {
                    await fileMake(fluentLockFilePath, JSON.stringify(fluentLock, null, 2));
                }
                const sourceLockInfoFilePath = `${outputFluentLockedPath}/source-lock-info.json`;
                const sourceLockInfo = {
                    sourceLockBranch: sourceLockedBranchName,
                    sourceLockHash: sourceLockHash,
                    sourceLock: sourceLock,
                }
                if (!await fileExists(sourceLockInfoFilePath)) {
                    await fileMake(sourceLockInfoFilePath, JSON.stringify(sourceLockInfo, null, 2));
                }
                const fluentConfigFilePath = `${outputFluentLockedPath}/fluent-bit-template.conf`
                if (!await fileExists(fluentConfigFilePath)) {
                    await fileMake(fluentConfigFilePath, configTemplate);
                }

                /* Write test records */
                const outputTestPath = `${outputFluentLockedPath}/test-${timestamp()}`;
                if (!await directoryExists(outputTestPath)) {
                    await directoryMake(outputTestPath);
                }
                const outputTestPathLatestSoftLink = `${outputFluentLockedPath}/latest`
                if (await directoryExists(outputTestPathLatestSoftLink)) {
                    await directoryDelete(outputTestPathLatestSoftLink);
                }
                await softLinkMake(outputTestPath, outputTestPathLatestSoftLink);
                setManagedVariable("testPath", outputTestPath);
                const testByproductPath = `${outputTestPath}/byproduct`;
                if (!await directoryExists(testByproductPath)) {
                    await directoryMake(testByproductPath);
                }
                setManagedVariable("testByproductPath", testByproductPath);
                const fluentPipelineSchemaFilePath = `${outputTestPath}/test-pipeline-schema.json`
                if (!await fileExists(fluentPipelineSchemaFilePath)) {
                    await fileMake(fluentPipelineSchemaFilePath, JSON.stringify(localPipelineSchema, null, 2));
                }
                const fluentFullSourcePath = `${outputTestPath}/source.json`
                if (!await fileExists(fluentFullSourcePath)) {
                    await fileMake(fluentFullSourcePath, JSON.stringify(amendedCodeSource, null, 2));
                }
                const outputInstrumentationPath = `${outputTestPath}/instrumentation`;
                if (!await directoryExists(outputInstrumentationPath)) {
                    await directoryMake(outputInstrumentationPath);
                }

                /* Render baked config file (now that all managed variables are set) */
                const configBaked = mustache.render(configTemplate, variables);
                const configId = hash(configBaked);
                if (!await directoryExists(fluentConfigWorkspacePath)) {
                    await directoryMake(fluentConfigWorkspacePath)
                }
                const configBakedPath = `${fluentConfigWorkspacePath}/${configId}.conf`;
                if (!await fileExists(configBakedPath)) {
                    await fileMake(configBakedPath, configBaked);
                }
                const fluentBakedConfigFilePath = `${outputTestPath}/fluent-bit.conf`
                if (!await fileExists(fluentBakedConfigFilePath)) {
                    await fileMake(fluentBakedConfigFilePath, configBaked);
                }

                /* Configure Fluent Bit, make, and cmake loggers */
                const outputLogPath = `${outputTestPath}/logs`;
                if (!await directoryExists(outputLogPath)) {
                    await directoryMake(outputLogPath);
                }
                fluentBitProcessLogger = winston.createLogger({
                    level: 'info',
                    defaultMeta: { service: 'fluent-bit' },
                    transports: config.fluentLogTransports.map(transport => new winston.transports.File({
                        ...transport,
                        filename: `${outputLogPath}/${transport?.filename ?? "fluent-bit.log"}`,
                    })),
                });

                /* Manage cache */
                if (!await directoryExists(cacheFolder)) {
                    await directoryMake(cacheFolder);
                }
                await manageCacheExpirations(logger, cacheFolder, cacheExpiration);

                /* Recover from cache */
                const executableWorkspacePath = `${workspaceRoot}/tmp/executable`;
                if (!await directoryExists(executableWorkspacePath)) {
                    await directoryMake(executableWorkspacePath);
                }
                const isRecovered = await recoverFromCache(sourceLockHash, executableWorkspacePath);
                if (!isRecovered) {
                    /* Build Fluent Bit */
                    await buildFluentBit(sourceLockHash, outputLogPath, fullRepoPath, executableWorkspacePath);
                }

                /* Copy to byproduct path */
                await fileCopy(`${executableWorkspacePath}/fluent-bit`, `${testByproductPath}/fluent-bit`);

                /* Run Fluent Bit */
                logger.info("Running Fluent Bit.");
                fluentBitChildProcess = spawn(`ulimit -c unlimited; exec ./fluent-bit -c '${fluentBakedConfigFilePath}'`, {
                    cwd: `${executableWorkspacePath}`,
                    env: {
                        ...process.env,
                        ...config.environmentVariables,
                        "FLB_INSTRUMENTATION_OUT_PATH": outputInstrumentationPath,
                    },
                    shell: true
                });
                fluentBitChildProcess.stdout.on('data', (data) => {
                    fluentLog(data);
                });
                fluentBitChildProcess.stderr.on('data', (data) => {
                    fluentLog(data);
                });
                fluentBitChildProcess.on('error', (error) => logger.error(`Fluent Bit Process error: ${error.message}`))

                await sleep(config.warmupTime);

                return true;
            },