core/aws-lsp-core/src/util/processUtils.ts (348 lines of code) (raw):
// Ported from VSC: https://github.com/aws/aws-toolkit-vscode/blob/91859e29b26ef1c58cbd957d81e1a0deb01a7880/packages/core/src/shared/utilities/processUtils.ts#L1
// Does not support Timeout Object since that is VSC specific.
import * as proc from 'child_process'
import * as crossSpawn from 'cross-spawn'
import { Logging } from '@aws/language-server-runtimes/server-interface'
import { PollingSet } from './pollingSet'
import { waitUntil } from './timeoutUtils'
export interface RunParameterContext {
/** Reports an error parsed from the stdin/stdout streams. */
reportError(err: string | Error): void
/** Attempts to stop the running process. See {@link ChildProcess.stop}. */
stop(force?: boolean, signal?: string): void
/** Send string to stdin */
send(text: string): Promise<void>
/** The logger being used by the process. */
readonly logger: Logging
}
export interface ChildProcessOptions {
/** Sets the logging behavior. (default: 'yes') */
logging?: 'yes' | 'no' | 'noparams'
/** Controls whether stdout/stderr is collected and returned in the `ChildProcessResult`. (default: true) */
collect?: boolean
/** Wait until streams close to resolve the process result. (default: true) */
waitForStreams?: boolean
/** Forcefully kill the process on an error. (default: false) */
useForceStop?: boolean
/** Rejects the Promise on any error. Can also use a callback for custom errors. (default: false) */
rejectOnError?: boolean | ((error: Error) => Error)
/** Rejects the Promise on non-zero exit codes. Can also use a callback for custom errors. (default: false) */
rejectOnErrorCode?: boolean | ((code: number) => Error)
/** Options sent to the `spawn` command. This is merged in with the base options if they exist. */
spawnOptions?: proc.SpawnOptions
/** Callback for intercepting text from the stdout stream. */
onStdout?: (text: string, context: RunParameterContext) => void
/** Callback for intercepting text from the stderr stream. */
onStderr?: (text: string, context: RunParameterContext) => void
}
export interface ChildProcessRunOptions extends Omit<ChildProcessOptions, 'logging'> {
/** Arguments applied in addition to the ones used in construction. */
extraArgs?: string[]
}
export interface ChildProcessResult {
exitCode: number
error: Error | undefined
/** All output emitted by the process, if it was started with `collect=true`, else empty. */
stdout: string
/** All stderr data emitted by the process, if it was started with `collect=true`, else empty. */
stderr: string
signal?: string
}
export const eof = Symbol('EOF')
export interface ProcessStats {
memory: number
cpu: number
}
export class ChildProcessTracker {
static #instance: ChildProcessTracker
static readonly pollingInterval: number = 10000 // Check usage every 10 seconds
static readonly thresholds: ProcessStats = {
memory: 100 * 1024 * 1024, // 100 MB
cpu: 50,
}
#processByPid: Map<number, ChildProcess> = new Map<number, ChildProcess>()
#pids: PollingSet<number>
private constructor(private logging: Logging) {
this.#pids = new PollingSet(ChildProcessTracker.pollingInterval, () => this.monitor())
}
public static getInstance(logging: Logging) {
// Overwrite logging if it exists
if (this.#instance) {
this.#instance.logging = logging
return this.#instance
}
return (this.#instance ??= new this(logging))
}
private cleanUp() {
const terminatedProcesses = Array.from(this.#pids.values()).filter(
(pid: number) => this.#processByPid.get(pid)?.stopped
)
for (const pid of terminatedProcesses) {
this.delete(pid)
}
}
private async monitor() {
this.cleanUp()
for (const pid of this.#pids.values()) {
await this.checkProcessUsage(pid)
}
}
private async checkProcessUsage(pid: number): Promise<void> {
if (!this.#pids.has(pid)) {
this.logging.warn(`Missing process with id ${pid}`)
return
}
const stats = this.getUsage(pid)
if (stats) {
this.logIfExceeds(pid, stats)
}
}
public logIfExceeds(pid: number, stats: ProcessStats) {
if (stats.memory > ChildProcessTracker.thresholds.memory) {
this.logging.warn(`Process ${pid} exceeded memory threshold: ${stats.memory}`)
}
if (stats.cpu > ChildProcessTracker.thresholds.cpu) {
this.logging.warn(`Process ${pid} exceeded cpu threshold: ${stats.cpu}`)
}
}
public add(childProcess: ChildProcess) {
const pid = childProcess.pid()
this.#processByPid.set(pid, childProcess)
this.#pids.add(pid)
}
public delete(childProcessId: number) {
this.#processByPid.delete(childProcessId)
this.#pids.delete(childProcessId)
}
public get size() {
return this.#pids.size
}
public has(childProcess: ChildProcess) {
return this.#pids.has(childProcess.pid())
}
public clear() {
for (const childProcess of this.#processByPid.values()) {
childProcess.stop(true)
}
this.#pids.clear()
this.#processByPid.clear()
}
private getUsage(pid: number): ProcessStats {
try {
return process.platform === 'win32' ? getWindowsUsage() : getUnixUsage()
} catch (e) {
this.logging.warn(`Failed to get process stats for ${pid}: ${e}`)
return { cpu: 0, memory: 0 }
}
function getWindowsUsage() {
const cpuOutput = proc
.execFileSync('wmic', [
'path',
'Win32_PerfFormattedData_PerfProc_Process',
'where',
`IDProcess=${pid}`,
'get',
'PercentProcessorTime',
])
.toString()
const memOutput = proc
.execFileSync('wmic', ['process', 'where', `ProcessId=${pid}`, 'get', 'WorkingSetSize'])
.toString()
const cpuPercentage = parseFloat(cpuOutput.split('\n')[1])
const memoryBytes = parseInt(memOutput.split('\n')[1]) * 1024
return {
cpu: isNaN(cpuPercentage) ? 0 : cpuPercentage,
memory: memoryBytes,
}
}
function getUnixUsage() {
const cpuMemOutput = proc.execFileSync('ps', ['-p', pid.toString(), '-o', '%cpu,%mem']).toString()
const rssOutput = proc.execFileSync('ps', ['-p', pid.toString(), '-o', 'rss']).toString()
const cpuMemLines = cpuMemOutput.split('\n')[1].trim().split(/\s+/)
const cpuPercentage = parseFloat(cpuMemLines[0])
const memoryBytes = parseInt(rssOutput.split('\n')[1]) * 1024
return {
cpu: isNaN(cpuPercentage) ? 0 : cpuPercentage,
memory: memoryBytes,
}
}
}
}
/**
* Convenience class to manage a child process
* To use:
* - instantiate
* - call and await run to get the results (pass or fail)
*/
export class ChildProcess {
#runningProcesses: ChildProcessTracker
static stopTimeout = 3000
#childProcess: proc.ChildProcess | undefined
#processErrors: Error[] = []
#processResult: ChildProcessResult | undefined
#log: Logging
/** Collects stdout data if the process was started with `collect=true`. */
#stdoutChunks: string[] = []
/** Collects stderr data if the process was started with `collect=true`. */
#stderrChunks: string[] = []
#command: string
#args: string[]
#baseOptions: ChildProcessOptions
#makeResult(code: number, signal?: NodeJS.Signals): ChildProcessResult {
return {
exitCode: code,
stdout: this.#stdoutChunks.join('').trim(),
stderr: this.#stderrChunks.join('').trim(),
error: this.#processErrors[0], // Only use the first since that one usually cascades.
signal,
}
}
public constructor(logging: Logging, command: string, args: string[] = [], baseOptions: ChildProcessOptions = {}) {
this.#command = command
this.#args = args
this.#baseOptions = baseOptions
this.#runningProcesses = ChildProcessTracker.getInstance(logging)
// TODO: allow caller to use the various loggers instead of just the single one
this.#log = logging
}
public static async run(
logging: Logging,
command: string,
args: string[] = [],
options?: ChildProcessOptions
): Promise<ChildProcessResult> {
return await new ChildProcess(logging, command, args, options).run()
}
// Inspired by 'got'
/**
* Creates a one-off {@link ChildProcess} class that always uses the specified options.
*/
public static extend(logging: Logging, options: ChildProcessOptions) {
return class extends this {
public constructor(command: string, args: string[] = []) {
super(logging, command, args, options)
}
}
}
/**
* Runs the child process. Options passed here are merged with the options passed in during construction.
* Priority is given to `run` options, overriding the previous value.
*/
public async run(params: ChildProcessRunOptions = {}): Promise<ChildProcessResult> {
if (this.#childProcess) {
throw new Error('process already started')
}
const options = {
collect: true,
waitForStreams: true,
...this.#baseOptions,
...params,
spawnOptions: { ...this.#baseOptions.spawnOptions, ...params.spawnOptions },
}
const { rejectOnError, rejectOnErrorCode } = options
const args = this.#args.concat(options.extraArgs ?? [])
this.#log.info(`Command: ${this.toString(options.logging === 'noparams')}`)
this.#log.debug(`running processes: ${this.#runningProcesses.size}`)
const cleanup = () => {
this.#childProcess?.stdout?.removeAllListeners()
this.#childProcess?.stderr?.removeAllListeners()
}
return new Promise<ChildProcessResult>((resolve, reject) => {
const errorHandler = (error: Error, force = options.useForceStop) => {
this.#processErrors.push(error)
if (!this.stopped) {
this.stop(force)
}
if (rejectOnError) {
if (typeof rejectOnError === 'function') {
reject(rejectOnError(error))
} else {
reject(error)
}
}
}
const paramsContext: RunParameterContext = {
logger: this.#log,
stop: this.stop.bind(this),
send: this.send.bind(this),
reportError: err => errorHandler(err instanceof Error ? err : new Error(err)),
}
// Async.
// See also crossSpawn.spawnSync().
// Arguments are forwarded[1] to node `child_process` module, see its documentation[2].
// [1] https://github.com/moxystudio/node-cross-spawn/blob/master/index.js
// [2] https://nodejs.org/api/child_process.html
try {
this.#childProcess = crossSpawn.spawn(this.#command, args, options.spawnOptions)
this.#registerLifecycleListeners(this.#childProcess)
} catch (err) {
return reject(err)
}
// Emitted whenever:
// 1. Process could not be spawned, or
// 2. Process could not be killed, or
// 3. Sending a message to the child process failed.
// https://nodejs.org/api/child_process.html#child_process_class_childprocess
// We also register error event handlers on the output/error streams in case a lower level library fails
this.#childProcess.on('error', errorHandler)
this.#childProcess.stdout?.on('error', errorHandler)
this.#childProcess.stderr?.on('error', errorHandler)
this.#childProcess.stdout?.on('data', (data: { toString(): string }) => {
if (options.collect) {
this.#stdoutChunks.push(data.toString())
}
options.onStdout?.(data.toString(), paramsContext)
})
this.#childProcess.stderr?.on('data', (data: { toString(): string }) => {
if (options.collect) {
this.#stderrChunks.push(data.toString())
}
options.onStderr?.(data.toString(), paramsContext)
})
// Emitted when streams are closed.
// This will not be fired if `waitForStreams` is false
this.#childProcess.once('close', (code, signal) => {
this.#processResult = this.#makeResult(code ?? -1, signal ?? undefined)
resolve(this.#processResult)
})
// Emitted when process exits or terminates.
// https://nodejs.org/api/child_process.html#child_process_class_childprocess
// - If the process exited, `code` is the final exit code of the process, else null.
// - If the process terminated because of a signal, `signal` is the name of the signal, else null.
// - One of `code` or `signal` will always be non-null.
this.#childProcess.once('exit', (code, signal) => {
this.#processResult = this.#makeResult(
typeof code === 'number' ? code : -1,
typeof signal === 'string' ? signal : undefined
)
if (code && rejectOnErrorCode) {
if (typeof rejectOnErrorCode === 'function') {
reject(rejectOnErrorCode(code))
} else {
reject(new Error(`Command exited with non-zero code: ${code}`))
}
}
if (options.waitForStreams === false) {
resolve(this.#processResult)
}
})
}).finally(() => cleanup())
}
/**
* Gets the `run()` result after the child process has finished.
*
* stdout/stderr will be empty unless the process was started with `collect=true`.
*
* @returns `run()` result, or undefined if the process has not yet started or is still running.
*/
public result(): ChildProcessResult | undefined {
return this.#processResult
}
public proc(): proc.ChildProcess | undefined {
return this.#childProcess
}
public pid(): number {
return this.#childProcess?.pid ?? -1
}
public exitCode(): number {
return this.#childProcess?.exitCode ?? -1
}
/**
* Stops the process.
*
* SIGTERM won't kill a terminal process, use SIGHUP instead.
*
* @param force Tries SIGKILL if the process is not stopped after a few seconds.
* @param signal Signal to send, defaults to SIGTERM (node default).
*
*/
public stop(force: boolean = false, signal?: NodeJS.Signals): void {
const child = this.#childProcess
if (!child || child.stdin?.destroyed) {
return
}
const command = this.#command
const pid = this.pid()
if (!this.stopped) {
child.kill(signal)
if (force === true) {
waitUntil(async () => this.stopped, { timeout: ChildProcess.stopTimeout, interval: 200, truthy: true })
.then(stopped => {
if (!stopped) {
child.kill('SIGKILL')
}
})
.catch(e => {
this.#log.warn(`stop(): SIGKILL failed: pid=${pid} command=${command}`)
})
}
} else {
throw new Error('Attempting to kill a process that has already been killed')
}
}
#registerLifecycleListeners(process: proc.ChildProcess): void {
const pid = process.pid
if (pid === undefined) {
return
}
this.#runningProcesses.add(this)
const dispose = () => {
this.#runningProcesses.delete(this.pid())
}
process.on('exit', dispose)
process.on('error', dispose)
}
/**
* Sends data to the process
*
* This throws if the process hasn't started or if the write fails.
*/
public async send(input: string | Buffer | typeof eof) {
if (this.#childProcess === undefined) {
throw new Error('Cannot write to non-existent process')
}
const stdin = this.#childProcess.stdin
if (!stdin) {
throw new Error('Cannot write to non-existent stdin')
}
if (input === eof) {
return new Promise<void>(resolve => stdin.end('', resolve))
}
return new Promise<void>((resolve, reject) => stdin.write(input, e => (e ? reject(e) : resolve())))
}
/**
* Returns true if the process has ended, or false if the process was not
* started or is still running.
*
* "Ended" means any of:
* - error prevented start, or
* - streams closed, or
* - process exited, or
* - exit-code was set.
*/
public get stopped(): boolean {
if (!this.#childProcess) {
return false // Not started yet.
}
return !!this.#processResult
}
/**
* Gets a string representation of the process invocation.
*
* @param noparams Omit parameters in the result (to protect sensitive info).
*/
public toString(noparams = false): string {
const pid = this.pid() > 0 ? `PID ${this.pid()}:` : '(not started)'
return `${pid} [${this.#command} ${noparams ? '...' : this.#args.join(' ')}]`
}
}