packages/@aws-cdk-testing/cli-integ/lib/xpmutex.ts (156 lines of code) (raw):

import { watch, promises as fs, mkdirSync } from 'fs'; import * as os from 'os'; import * as path from 'path'; export class XpMutexPool { public static fromDirectory(directory: string) { mkdirSync(directory, { recursive: true }); return new XpMutexPool(directory); } public static fromName(name: string) { return XpMutexPool.fromDirectory(path.join(os.tmpdir(), name)); } private readonly waitingResolvers = new Set<() => void>(); private watcher: ReturnType<typeof watch> | undefined; private constructor(public readonly directory: string) { this.startWatch(); } public mutex(name: string) { return new XpMutex(this, name); } /** * Await an unlock event * * (An unlock event is when a file in the directory gets deleted, with a tiny * random sleep attached to it). */ public awaitUnlock(maxWaitMs?: number): Promise<void> { const wait = new Promise<void>(ok => { this.waitingResolvers.add(async () => { await randomSleep(10); ok(); }); }); if (maxWaitMs) { return Promise.race([wait, sleep(maxWaitMs)]); } else { return wait; } } private startWatch() { this.watcher = watch(this.directory); (this.watcher as any).unref(); // @types doesn't know about this but it exists this.watcher.on('change', async (eventType, fname) => { // Only trigger on 'deletes'. // After receiving the event, we check if the file exists. // - If no: the file was deleted! Huzzah, this counts as a wakeup. // - If yes: either the file was just created (in which case we don't need to wakeup) // or the event was due to a delete but someone raced us to it and claimed the // file already (in which case we also don't need to wake up). if (eventType === 'rename' && !await fileExists(path.join(this.directory, fname.toString()))) { this.notifyWaiters(); } }); this.watcher.on('error', async (e) => { // eslint-disable-next-line no-console console.error(e); await randomSleep(100); this.startWatch(); }); } private notifyWaiters() { for (const promise of this.waitingResolvers) { promise(); } this.waitingResolvers.clear(); } } /** * Cross-process mutex * * Uses the presence of a file on disk and `fs.watch` to represent the mutex * and discover unlocks. */ export class XpMutex { private readonly fileName: string; constructor(private readonly pool: XpMutexPool, public readonly mutexName: string) { this.fileName = path.join(pool.directory, `${mutexName}.mutex`); } /** * Try to acquire the lock (may fail) */ public async tryAcquire(): Promise<ILock | undefined> { while (true) { // Acquire lock by being the one to create the file try { return await this.writePidFile('wx'); // Fails if the file already exists } catch (e: any) { if (e.code !== 'EEXIST') { throw e; } } // File already exists. Read the contents, see if it's an existent PID (if so, the lock is taken) const ownerPid = await this.readPidFile(); if (ownerPid === undefined) { // File got deleted just now, maybe we can acquire it again continue; } if (processExists(ownerPid)) { return undefined; } // If not, the lock is stale and will never be released anymore. We may // delete it and acquire it anyway, but we may be racing someone else trying // to do the same. Solve this as follows: // - Try to acquire a lock that gives us permissions to declare the existing lock stale. // - Sleep a small random period to reduce contention on this operation await randomSleep(10); const innerMux = new XpMutex(this.pool, `${this.mutexName}.${ownerPid}`); const innerLock = await innerMux.tryAcquire(); if (!innerLock) { return undefined; } // We may not release the 'inner lock' we used to acquire the rights to declare the other // lock stale until we release the actual lock itself. If we did, other contenders might // see it released while they're still in this fallback block and accidentally steal // from a new legitimate owner. return this.writePidFile('w', innerLock); // Force write lock file, attach inner lock as well } } /** * Acquire the lock, waiting until we can */ public async acquire(): Promise<ILock> { while (true) { // Start the wait here, so we don't miss the signal if it comes after // we try but before we sleep. // // We also periodically retry anyway since we may have missed the delete // signal due to unfortunate timing. const wait = this.pool.awaitUnlock(5000); const lock = await this.tryAcquire(); if (lock) { // Ignore the wait (count as handled) wait.then(() => { }, () => { }); return lock; } await wait; await randomSleep(100); } } private async readPidFile(): Promise<number | undefined> { const deadLine = Date.now() + 1000; while (Date.now() < deadLine) { let contents; try { contents = await fs.readFile(this.fileName, { encoding: 'utf-8' }); } catch (e: any) { if (e.code === 'ENOENT') { return undefined; } throw e; } // Retry until we've seen the full contents if (contents.endsWith('.')) { return parseInt(contents.substring(0, contents.length - 1), 10); } await sleep(10); } throw new Error(`${this.fileName} was never completely written`); } private async writePidFile(mode: string, additionalLock?: ILock): Promise<ILock> { const fd = await fs.open(this.fileName, mode); // May fail if the file already exists await fd.write(`${process.pid}.`); // Period guards against partial reads await fd.close(); return { release: async () => { await fs.unlink(this.fileName); await additionalLock?.release(); }, }; } } export interface ILock { release(): Promise<void>; } async function fileExists(fileName: string) { try { await fs.stat(fileName); return true; } catch (e: any) { if (e.code === 'ENOENT') { return false; } throw e; } } function processExists(pid: number) { try { process.kill(pid, 0); return true; } catch { return false; } } function sleep(ms: number): Promise<void> { return new Promise(ok => (setTimeout(ok, ms) as any).unref()); } function randomSleep(ms: number) { return sleep(Math.floor(Math.random() * ms)); }