in src/common/persistence/FSExtentStore.ts [168:322]
public async appendExtent(
data: NodeJS.ReadableStream | Buffer,
contextId?: string
): Promise<IExtentChunk> {
const op = () =>
new Promise<IExtentChunk>((resolve, reject) => {
(async (): Promise<IExtentChunk> => {
let appendExtentIdx = 0;
for (let i = 1; i < this.activeWriteExtentsNumber; i++) {
if (
this.activeWriteExtents[i].appendStatus === AppendStatusCode.Idle
) {
appendExtentIdx = i;
break;
}
}
this.activeWriteExtents[appendExtentIdx].appendStatus =
AppendStatusCode.Appending;
this.logger.info(
`FSExtentStore:appendExtent() Select extent from idle location for extent append operation. LocationId:${appendExtentIdx} extentId:${this.activeWriteExtents[appendExtentIdx].id} offset:${this.activeWriteExtents[appendExtentIdx].offset} MAX_EXTENT_SIZE:${MAX_EXTENT_SIZE} `,
contextId
);
if (
this.activeWriteExtents[appendExtentIdx].offset >= MAX_EXTENT_SIZE
) {
this.logger.info(
`FSExtentStore:appendExtent() Size of selected extent offset is larger than maximum extent size ${MAX_EXTENT_SIZE} bytes, try appending to new extent.`,
contextId
);
const selectedFd = this.activeWriteExtents[appendExtentIdx].fd;
if (selectedFd) {
this.logger.info(
`FSExtentStore:appendExtent() Close unused fd:${selectedFd}.`,
contextId
);
try {
await closeAsync(selectedFd);
} catch (err) {
this.logger.error(
`FSExtentStore:appendExtent() Close unused fd:${selectedFd} error:${JSON.stringify(
err
)}.`,
contextId
);
}
}
await this.getNewExtent(this.activeWriteExtents[appendExtentIdx]);
this.logger.info(
`FSExtentStore:appendExtent() Allocated new extent LocationID:${appendExtentIdx} extentId:${this.activeWriteExtents[appendExtentIdx].id} offset:${this.activeWriteExtents[appendExtentIdx].offset} MAX_EXTENT_SIZE:${MAX_EXTENT_SIZE} `,
contextId
);
}
let rs: NodeJS.ReadableStream;
if (data instanceof Buffer) {
rs = new BufferStream(data);
} else {
rs = data;
}
const appendExtent = this.activeWriteExtents[appendExtentIdx];
const id = appendExtent.id;
const path = this.generateExtentPath(appendExtent.locationId, id);
let fd = appendExtent.fd;
this.logger.debug(
`FSExtentStore:appendExtent() Get fd:${fd} for extent:${id} from cache.`,
contextId
);
if (fd === undefined) {
fd = await openAsync(path, "a");
appendExtent.fd = fd;
this.logger.debug(
`FSExtentStore:appendExtent() Open file:${path} for extent:${id}, get new fd:${fd}`,
contextId
);
}
const ws = createWriteStream(path, {
fd,
autoClose: false
});
this.logger.debug(
`FSExtentStore:appendExtent() Created write stream for fd:${fd}`,
contextId
);
let count = 0;
this.logger.debug(
`FSExtentStore:appendExtent() Start writing to extent ${id}`,
contextId
);
try {
count = await this.streamPipe(rs, ws, fd, contextId);
const offset = appendExtent.offset;
appendExtent.offset += count;
const extent: IExtentModel = {
id,
locationId: appendExtent.locationId,
path: id,
size: count + offset,
lastModifiedInMS: Date.now()
};
this.logger.debug(
`FSExtentStore:appendExtent() Write finish, start updating extent metadata. extent:${JSON.stringify(
extent
)}`,
contextId
);
await this.metadataStore.updateExtent(extent);
this.logger.debug(
`FSExtentStore:appendExtent() Update extent metadata done. Resolve()`,
contextId
);
appendExtent.appendStatus = AppendStatusCode.Idle;
return {
id,
offset,
count
};
} catch (err) {
// Reset cursor position to the current offset. On Windows, truncating a file open in append mode doesn't
// work, so we need to close the file descriptor first.
try {
appendExtent.fd = undefined;
await closeAsync(fd);
await truncateAsync(path, appendExtent.offset);
// Indicate that the extent is ready for the next append operation.
appendExtent.appendStatus = AppendStatusCode.Idle;
} catch (truncate_err) {
this.logger.error(
`FSExtentStore:appendExtent() Truncate path:${path} len: ${appendExtent.offset} error:${JSON.stringify(
truncate_err
)}.`,
contextId
);
}
throw err;
}
})()
.then(resolve)
.catch(reject);
});
return this.appendQueue.operate(op, contextId);
}