packages/@aws-cdk/toolkit-lib/lib/api/rwlock.ts (138 lines of code) (raw):

import { promises as fs } from 'fs'; import * as path from 'path'; import { ToolkitError } from './toolkit-error'; /** * A single-writer/multi-reader lock on a directory * * It uses marker files with PIDs in them as a locking marker; the PIDs will be * checked for liveness, so that if the process exits without cleaning up the * files the lock is implicitly released. * * This class is not 100% race safe, but in practice it should be a lot * better than the 0 protection we have today. */ /* c8 ignore start */ // code paths are unpredictable export class RWLock { private readonly pidString: string; private readonly writerFile: string; private readCounter = 0; constructor(public readonly directory: string) { this.pidString = `${process.pid}`; this.writerFile = path.join(this.directory, 'synth.lock'); } /** * Acquire a writer lock. * * No other readers or writers must exist for the given directory. */ public async acquireWrite(): Promise<IWriteLock> { await this.assertNoOtherWriters(); const readers = await this._currentReaders(); if (readers.length > 0) { throw new ToolkitError(`Other CLIs (PID=${readers}) are currently reading from ${this.directory}. Invoke the CLI in sequence, or use '--output' to synth into different directories.`); } await writeFileAtomic(this.writerFile, this.pidString); let released = false; return { release: async () => { // Releasing needs a flag, otherwise we might delete a file that some other lock has created in the mean time. if (!released) { await deleteFile(this.writerFile); released = true; } }, convertToReaderLock: async () => { // Acquire the read lock before releasing the write lock. Slightly less // chance of racing! const ret = await this.doAcquireRead(); await deleteFile(this.writerFile); return ret; }, }; } /** * Acquire a read lock * * Will fail if there are any writers. */ public async acquireRead(): Promise<IReadLock> { await this.assertNoOtherWriters(); return this.doAcquireRead(); } /** * Obtains the name fo a (new) `readerFile` to use. This includes a counter so * that if multiple threads of the same PID attempt to concurrently acquire * the same lock, they're guaranteed to use a different reader file name (only * one thread will ever execute JS code at once, guaranteeing the readCounter * is incremented "atomically" from the point of view of this PID.). */ private readerFile(): string { return path.join(this.directory, `read.${this.pidString}.${++this.readCounter}.lock`); } /** * Do the actual acquiring of a read lock. */ private async doAcquireRead(): Promise<IReadLock> { const readerFile = this.readerFile(); await writeFileAtomic(readerFile, this.pidString); let released = false; return { release: async () => { // Releasing needs a flag, otherwise we might delete a file that some other lock has created in the mean time. if (!released) { await deleteFile(readerFile); released = true; } }, }; } private async assertNoOtherWriters() { const writer = await this._currentWriter(); if (writer) { throw new ToolkitError(`Another CLI (PID=${writer}) is currently synthing to ${this.directory}. Invoke the CLI in sequence, or use '--output' to synth into different directories.`); } } /** * Check the current writer (if any) * * Publicly accessible for testing purposes. Do not use. * * @internal */ public async _currentWriter(): Promise<number | undefined> { const contents = await readFileIfExists(this.writerFile); if (!contents) { return undefined; } const pid = parseInt(contents, 10); if (!processExists(pid)) { // Do cleanup of a stray file now await deleteFile(this.writerFile); return undefined; } return pid; } /** * Check the current readers (if any) * * Publicly accessible for testing purposes. Do not use. * * @internal */ public async _currentReaders(): Promise<number[]> { const re = /^read\.([^.]+)\.[^.]+\.lock$/; const ret = new Array<number>(); let children; try { children = await fs.readdir(this.directory, { encoding: 'utf-8' }); } catch (e: any) { // Can't be locked if the directory doesn't exist if (e.code === 'ENOENT') { return []; } throw e; } for (const fname of children) { const m = fname.match(re); if (m) { const pid = parseInt(m[1], 10); if (processExists(pid)) { ret.push(pid); } else { // Do cleanup of a stray file now await deleteFile(path.join(this.directory, fname)); } } } return ret; } } /* c8 ignore stop */ /** * An acquired lock */ export interface IReadLock { /** * Release the lock. Can be called more than once. */ release(): Promise<void>; } /** * An acquired writer lock */ export interface IWriteLock extends IReadLock { /** * Convert the writer lock to a reader lock */ convertToReaderLock(): Promise<IReadLock>; } /* c8 ignore start */ // code paths are unpredictable async function readFileIfExists(filename: string): Promise<string | undefined> { try { return await fs.readFile(filename, { encoding: 'utf-8' }); } catch (e: any) { if (e.code === 'ENOENT') { return undefined; } throw e; } } /* c8 ignore stop */ let tmpCounter = 0; /* c8 ignore start */ // code paths are unpredictable async function writeFileAtomic(filename: string, contents: string): Promise<void> { await fs.mkdir(path.dirname(filename), { recursive: true }); const tmpFile = `${filename}.${process.pid}_${++tmpCounter}`; await fs.writeFile(tmpFile, contents, { encoding: 'utf-8' }); await fs.rename(tmpFile, filename); } /* c8 ignore stop */ /* c8 ignore start */ // code paths are unpredictable async function deleteFile(filename: string) { try { await fs.unlink(filename); } catch (e: any) { if (e.code === 'ENOENT') { return; } throw e; } } /* c8 ignore stop */ /* c8 ignore start */ // code paths are unpredictable function processExists(pid: number) { try { process.kill(pid, 0); return true; } catch (e) { return false; } } /* c8 ignore stop */