in src/common/persistence/FSExtentStore.ts [506:612]
private async streamPipe(
rs: NodeJS.ReadableStream,
ws: Writable,
fd?: number,
contextId?: string
): Promise<number> {
return new Promise<number>((resolve, reject) => {
this.logger.debug(
`FSExtentStore:streamPipe() Start piping data to write stream`,
contextId
);
let count: number = 0;
let wsEnd = false;
rs.on("data", data => {
count += data.length;
if (!ws.write(data)) {
rs.pause();
}
})
.on("end", () => {
this.logger.debug(
`FSExtentStore:streamPipe() Readable stream triggers close event, ${count} bytes piped`,
contextId
);
if (!wsEnd) {
this.logger.debug(
`FSExtentStore:streamPipe() Invoke write stream end()`,
contextId
);
ws.end();
wsEnd = true;
}
})
.on("close", () => {
this.logger.debug(
`FSExtentStore:streamPipe() Readable stream triggers close event, ${count} bytes piped`,
contextId
);
if (!wsEnd) {
this.logger.debug(
`FSExtentStore:streamPipe() Invoke write stream end()`,
contextId
);
ws.end();
wsEnd = true;
}
})
.on("error", err => {
this.logger.debug(
`FSExtentStore:streamPipe() Readable stream triggers error event, error:${JSON.stringify(
err
)}, after ${count} bytes piped. Reject streamPipe().`,
contextId
);
reject(err);
});
ws.on("drain", () => {
rs.resume();
})
.on("finish", () => {
if (typeof fd === "number") {
this.logger.debug(
`FSExtentStore:streamPipe() Writable stream triggers finish event, after ${count} bytes piped. Flush data to fd:${fd}.`,
contextId
);
fdatasync(fd, err => {
if (err) {
this.logger.debug(
`FSExtentStore:streamPipe() Flush data to fd:${fd} failed with error:${JSON.stringify(
err
)}. Reject streamPipe().`,
contextId
);
reject(err);
} else {
this.logger.debug(
`FSExtentStore:streamPipe() Flush data to fd:${fd} successfully. Resolve streamPipe().`,
contextId
);
resolve(count);
}
});
} else {
this.logger.debug(
`FSExtentStore:streamPipe() Resolve streamPipe() without flushing data.`,
contextId
);
resolve(count);
}
})
.on("error", err => {
this.logger.debug(
`FSExtentStore:streamPipe() Writable stream triggers error event, error:${JSON.stringify(
err
)}, after ${count} bytes piped. Reject streamPipe().`,
contextId
);
reject(err);
});
});
}