in Sources/TSFCASFileTree/FileTreeImport.swift [939:1129]
func assemblePartialNextSteps() -> [LLBFuture<NextStep>] {
let cheapNextAndPartialStepFutures: [(nextStepFuture: LLBFuture<NextStep>, partialStepFuture: LLBFuture<NextStep>)] = segmentDescriptors.enumerated().map { (segmentOffset, segm) in
// This partial step is a final step for the sequence
// of cheap/heavy steps, for a single segment (chunk).
// We use cancellable promise since our heavy next steps might
// not even run at all.
let partialStepPromise = LLBCancellablePromise(promise: loop.makePromise(of: NextStep.self))
func encodeNextStep(for id: LLBDataID) -> NextStep {
if isSingleChunk {
return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: segm.uncompressedSize), posixDetails: importObject.posixDetails))
} else {
return .partialFileChunk(id)
}
}
// If the file has non-standard layout, upload it after we upload
// the binary blob.
func uploadFileInfo(blobId: LLBDataID, importSize: Int? = nil) -> LLBFuture<NextStep> {
guard finalResultPromise.isCompleted == false else {
return loop.makeSucceededFuture(NextStep.skipped)
}
// We need to wrap the blob in a `FileInformation`:
// — When we compress the file.
// — When the file is top level (importing a single file).
guard segm.isCompressed || topLevel else {
if let size = importSize {
_ = stats.importedObjects_.add(1)
_ = stats.importedBytes_.add(size)
}
return loop.makeSucceededFuture(encodeNextStep(for: blobId))
}
var fileInfo = LLBFileInfo()
// Each segment (if not a single segment) is encoded as a plain
// file and doesn't have any other metadata (e.g. permissions).
if isSingleChunk {
fileInfo.type = type
fileInfo.update(posixDetails: importObject.posixDetails, options: self.options)
} else {
fileInfo.type = .plainFile
}
fileInfo.size = UInt64(segm.uncompressedSize)
// FIXME: no compression supported right now
// fileInfo.compression = segm.isCompressed ? ... : .none
assert(!segm.isCompressed)
fileInfo.compression = .none
fileInfo.fixedChunkSize = UInt64(segm.uncompressedSize)
do {
return dbPut(refs: [blobId], data: try fileInfo.toBytes(), importSize: importSize, ctx).map { id in
encodeNextStep(for: id)
}
} catch {
return loop.makeFailedFuture(error)
}
}
let containsRequestWireSizeEstimate = 64
let throttledContainsFuture = self.execute(on: self.netQueue, size: containsRequestWireSizeEstimate, default: .skipped) { () -> LLBFuture<NextStep> in
let containsFuture = self.dbContains(segm, ctx)
let containsLoop = containsFuture.eventLoop
return containsFuture.flatMap { exists -> LLBFuture<NextStep> in
guard !exists else {
let existingIdFuture: LLBFuture<NextStep> = segm.id.flatMap { id in
return uploadFileInfo(blobId: id, importSize: segm.uncompressedSize)
}
existingIdFuture.cascade(to: partialStepPromise)
return existingIdFuture
}
return containsLoop.makeSucceededFuture(NextStep.execute(in: .UploadingFiles, run: {
let nextStepFuture: LLBFuture<NextStep> = self.execute(on: self.cpuQueue, default: nil) { () -> LLBFuture<NextStep>? in
let data: LLBFastData
switch importObject {
case let .link(target):
data = target
case let .file(file, _):
data = try self.prepareSingleSegment(of: file, segmentNumber: segmentOffset, useCompression: segm.isCompressed)
}
let slice = data.toByteBuffer()
// Make sure we want until the netQueue is sufficiently
// free to take our load. This ensures that we're not
// limited by CPU parallelism for network concurrency.
return self.executeWithBackpressure(on: self.netQueue, loop: containsLoop, size: slice.readableBytes, default: .skipped) { () -> LLBFuture<NextStep> in
return self.dbPut(refs: [], data: slice, importSize: segm.uncompressedSize, ctx).flatMap { id -> LLBFuture<NextStep> in
withExtendedLifetime(importObject) { // for mmap
uploadFileInfo(blobId: id)
}
}.map { result in
return result
}.hop(to: loop)
}
}.flatMap {
// This type of return ensures that cpuQueue does not
// wait for the netQueue operation to complete.
$0 ?? loop.makeSucceededFuture(NextStep.skipped)
}
nextStepFuture.cascade(to: partialStepPromise)
return nextStepFuture
}))
}
}
return (nextStepFuture: throttledContainsFuture, partialStepFuture: partialStepPromise.futureResult)
}
let cheapNextStepFutures: [LLBFuture<NextStep>] = cheapNextAndPartialStepFutures.map { $0.nextStepFuture }
// Sending a single segment, for which we don't need
// to wait until all of its subcomponents are uploaded.
guard isSingleChunk == false else {
return cheapNextStepFutures
}
let partialStepFutures = cheapNextAndPartialStepFutures.map { $0.partialStepFuture }
let combinePartialResultsStep: NextStep = NextStep.execute(in: .UploadingWait, run: {
return self.whenAllSucceed(partialStepFutures).flatMapErrorThrowing { error -> [NextStep] in
// If ready any segment fails with something, we either forward
// the error or hide it.
if error is FileSystemError {
// Some kind of filesystem access error.
if self.options.skipUnreadable {
return []
}
} else if let fsError = error as? FileSegmenter.Error {
if case .resourceChanged = fsError {
if self.options.relaxConsistencyChecks {
// Turn consistency checks errors into skips.
// Location /EC1.
return []
}
}
} else if error is FileSystemError {
if self.options.skipUnreadable {
// Not a consistency error, hide it.
return []
}
}
throw error
}.flatMap { nextSteps in
// The next steps can only be empty if we've reacting on
// a filesystem-related error with some of our chunks.
guard nextSteps.isEmpty == false else {
return loop.makeSucceededFuture(.skipped)
}
let chunkIds: [LLBDataID] = nextSteps.map {
guard case let .partialFileChunk(id) = $0 else {
preconditionFailure("Next step is not a partial chunk")
}
return id
}
var fileInfo = LLBFileInfo()
fileInfo.type = type
fileInfo.size = UInt64(allSegmentsUncompressedDataSize)
// The top is not compressed when chunks are present.
fileInfo.compression = .none
fileInfo.fixedChunkSize = UInt64(chunkIds.count > 1 ? self.options.fileChunkSize : allSegmentsUncompressedDataSize)
let posixDetails = importObject.posixDetails
fileInfo.update(posixDetails: posixDetails, options: self.options)
do {
let fileInfoBytes = try fileInfo.toBytes()
return self.execute(on: self.netQueue, size: fileInfoBytes.readableBytes, default: .skipped) {
self.dbPut(refs: chunkIds, data: fileInfoBytes, importSize: nil, ctx).map { id in
return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: allSegmentsUncompressedDataSize), posixDetails: posixDetails))
}
}
} catch {
return loop.makeFailedFuture(error)
}
}
})
// Since uploading fragmented files requires waiting and churning
// the recursive next step machinery until the parts are properly
// uploaded, we can't just block on waiting until all the chunks
// have been uploaded. Therefore we wait for the huge files in
// its own state, .UploadingWait.
return cheapNextStepFutures + [loop.makeSucceededFuture(combinePartialResultsStep)]
}