private async streamPipe()

in src/common/persistence/FSExtentStore.ts [506:612]


  private async streamPipe(
    rs: NodeJS.ReadableStream,
    ws: Writable,
    fd?: number,
    contextId?: string
  ): Promise<number> {
    return new Promise<number>((resolve, reject) => {
      this.logger.debug(
        `FSExtentStore:streamPipe() Start piping data to write stream`,
        contextId
      );

      let count: number = 0;
      let wsEnd = false;

      rs.on("data", data => {
        count += data.length;
        if (!ws.write(data)) {
          rs.pause();
        }
      })
        .on("end", () => {
          this.logger.debug(
            `FSExtentStore:streamPipe() Readable stream triggers close event, ${count} bytes piped`,
            contextId
          );

          if (!wsEnd) {
            this.logger.debug(
              `FSExtentStore:streamPipe() Invoke write stream end()`,
              contextId
            );
            ws.end();
            wsEnd = true;
          }
        })
        .on("close", () => {
          this.logger.debug(
            `FSExtentStore:streamPipe() Readable stream triggers close event, ${count} bytes piped`,
            contextId
          );

          if (!wsEnd) {
            this.logger.debug(
              `FSExtentStore:streamPipe() Invoke write stream end()`,
              contextId
            );
            ws.end();
            wsEnd = true;
          }
        })
        .on("error", err => {
          this.logger.debug(
            `FSExtentStore:streamPipe() Readable stream triggers error event, error:${JSON.stringify(
              err
            )}, after ${count} bytes piped. Reject streamPipe().`,
            contextId
          );
          
          reject(err);
        });

      ws.on("drain", () => {
        rs.resume();
      })
        .on("finish", () => {
          if (typeof fd === "number") {
            this.logger.debug(
              `FSExtentStore:streamPipe() Writable stream triggers finish event, after ${count} bytes piped. Flush data to fd:${fd}.`,
              contextId
            );
            fdatasync(fd, err => {
              if (err) {
                this.logger.debug(
                  `FSExtentStore:streamPipe() Flush data to fd:${fd} failed with error:${JSON.stringify(
                    err
                  )}. Reject streamPipe().`,
                  contextId
                );
                reject(err);
              } else {
                this.logger.debug(
                  `FSExtentStore:streamPipe() Flush data to fd:${fd} successfully. Resolve streamPipe().`,
                  contextId
                );
                resolve(count);
              }
            });
          } else {
            this.logger.debug(
              `FSExtentStore:streamPipe() Resolve streamPipe() without flushing data.`,
              contextId
            );
            resolve(count);
          }
        })
        .on("error", err => {
          this.logger.debug(
            `FSExtentStore:streamPipe() Writable stream triggers error event, error:${JSON.stringify(
              err
            )}, after ${count} bytes piped. Reject streamPipe().`,
            contextId
          );
          reject(err);
        });
    });
  }