public async appendExtent()

in src/common/persistence/FSExtentStore.ts [168:322]


  public async appendExtent(
    data: NodeJS.ReadableStream | Buffer,
    contextId?: string
  ): Promise<IExtentChunk> {
    const op = () =>
      new Promise<IExtentChunk>((resolve, reject) => {
        (async (): Promise<IExtentChunk> => {
          let appendExtentIdx = 0;

          for (let i = 1; i < this.activeWriteExtentsNumber; i++) {
            if (
              this.activeWriteExtents[i].appendStatus === AppendStatusCode.Idle
            ) {
              appendExtentIdx = i;
              break;
            }
          }
          this.activeWriteExtents[appendExtentIdx].appendStatus =
            AppendStatusCode.Appending;

          this.logger.info(
            `FSExtentStore:appendExtent() Select extent from idle location for extent append operation. LocationId:${appendExtentIdx} extentId:${this.activeWriteExtents[appendExtentIdx].id} offset:${this.activeWriteExtents[appendExtentIdx].offset} MAX_EXTENT_SIZE:${MAX_EXTENT_SIZE} `,
            contextId
          );

          if (
            this.activeWriteExtents[appendExtentIdx].offset >= MAX_EXTENT_SIZE
          ) {
            this.logger.info(
              `FSExtentStore:appendExtent() Size of selected extent offset is larger than maximum extent size ${MAX_EXTENT_SIZE} bytes, try appending to new extent.`,
              contextId
            );

            const selectedFd = this.activeWriteExtents[appendExtentIdx].fd;
            if (selectedFd) {
              this.logger.info(
                `FSExtentStore:appendExtent() Close unused fd:${selectedFd}.`,
                contextId
              );
              try {
                await closeAsync(selectedFd);
              } catch (err) {
                this.logger.error(
                  `FSExtentStore:appendExtent() Close unused fd:${selectedFd} error:${JSON.stringify(
                    err
                  )}.`,
                  contextId
                );
              }
            }

            await this.getNewExtent(this.activeWriteExtents[appendExtentIdx]);
            this.logger.info(
              `FSExtentStore:appendExtent() Allocated new extent LocationID:${appendExtentIdx} extentId:${this.activeWriteExtents[appendExtentIdx].id} offset:${this.activeWriteExtents[appendExtentIdx].offset} MAX_EXTENT_SIZE:${MAX_EXTENT_SIZE} `,
              contextId
            );
          }

          let rs: NodeJS.ReadableStream;
          if (data instanceof Buffer) {
            rs = new BufferStream(data);
          } else {
            rs = data;
          }

          const appendExtent = this.activeWriteExtents[appendExtentIdx];
          const id = appendExtent.id;
          const path = this.generateExtentPath(appendExtent.locationId, id);
          let fd = appendExtent.fd;
          this.logger.debug(
            `FSExtentStore:appendExtent() Get fd:${fd} for extent:${id} from cache.`,
            contextId
          );
          if (fd === undefined) {
            fd = await openAsync(path, "a");
            appendExtent.fd = fd;
            this.logger.debug(
              `FSExtentStore:appendExtent() Open file:${path} for extent:${id}, get new fd:${fd}`,
              contextId
            );
          }

          const ws = createWriteStream(path, {
            fd,
            autoClose: false
          });

          this.logger.debug(
            `FSExtentStore:appendExtent() Created write stream for fd:${fd}`,
            contextId
          );

          let count = 0;

          this.logger.debug(
            `FSExtentStore:appendExtent() Start writing to extent ${id}`,
            contextId
          );

          try {
            count = await this.streamPipe(rs, ws, fd, contextId);
            const offset = appendExtent.offset;
            appendExtent.offset += count;

            const extent: IExtentModel = {
              id,
              locationId: appendExtent.locationId,
              path: id,
              size: count + offset,
              lastModifiedInMS: Date.now()
            };
            this.logger.debug(
              `FSExtentStore:appendExtent() Write finish, start updating extent metadata. extent:${JSON.stringify(
                extent
              )}`,
              contextId
            );
            await this.metadataStore.updateExtent(extent);

            this.logger.debug(
              `FSExtentStore:appendExtent() Update extent metadata done. Resolve()`,
              contextId
            );
            appendExtent.appendStatus = AppendStatusCode.Idle;
            return {
              id,
              offset,
              count
            };
          } catch (err) {
            // Reset cursor position to the current offset. On Windows, truncating a file open in append mode doesn't
            // work, so we need to close the file descriptor first.
            try {
              appendExtent.fd = undefined;
              await closeAsync(fd);
              await truncateAsync(path, appendExtent.offset);
              // Indicate that the extent is ready for the next append operation.
              appendExtent.appendStatus = AppendStatusCode.Idle;
            } catch (truncate_err) {
              this.logger.error(
                `FSExtentStore:appendExtent() Truncate path:${path} len: ${appendExtent.offset} error:${JSON.stringify(
                  truncate_err
                )}.`,
                contextId
              );
            }
            throw err;
          }
        })()
          .then(resolve)
          .catch(reject);
      });

    return this.appendQueue.operate(op, contextId);
  }