func assemblePartialNextSteps()

in Sources/TSFCASFileTree/FileTreeImport.swift [939:1129]


        func assemblePartialNextSteps() -> [LLBFuture<NextStep>] {
          let cheapNextAndPartialStepFutures: [(nextStepFuture: LLBFuture<NextStep>, partialStepFuture: LLBFuture<NextStep>)] = segmentDescriptors.enumerated().map { (segmentOffset, segm) in

              // This partial step is a final step for the sequence
              // of cheap/heavy steps, for a single segment (chunk).
              // We use cancellable promise since our heavy next steps might
              // not even run at all.
              let partialStepPromise = LLBCancellablePromise(promise: loop.makePromise(of: NextStep.self))

              func encodeNextStep(for id: LLBDataID) -> NextStep {
                if isSingleChunk {
                    return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: segm.uncompressedSize), posixDetails: importObject.posixDetails))
                } else {
                    return .partialFileChunk(id)
                }
              }

              // If the file has non-standard layout, upload it after we upload
              // the binary blob.
              func uploadFileInfo(blobId: LLBDataID, importSize: Int? = nil) -> LLBFuture<NextStep> {
                guard finalResultPromise.isCompleted == false else {
                    return loop.makeSucceededFuture(NextStep.skipped)
                }

                // We need to wrap the blob in a `FileInformation`:
                // — When we compress the file.
                // — When the file is top level (importing a single file).
                guard segm.isCompressed || topLevel else {
                    if let size = importSize {
                        _ = stats.importedObjects_.add(1)
                        _ = stats.importedBytes_.add(size)
                    }
                    return loop.makeSucceededFuture(encodeNextStep(for: blobId))
                }

                var fileInfo = LLBFileInfo()
                // Each segment (if not a single segment) is encoded as a plain
                // file and doesn't have any other metadata (e.g. permissions).
                if isSingleChunk {
                    fileInfo.type = type
                    fileInfo.update(posixDetails: importObject.posixDetails, options: self.options)
                } else {
                    fileInfo.type = .plainFile
                }
                fileInfo.size = UInt64(segm.uncompressedSize)
                // FIXME: no compression supported right now
                // fileInfo.compression = segm.isCompressed ? ... : .none
                assert(!segm.isCompressed)
                fileInfo.compression = .none
                fileInfo.fixedChunkSize = UInt64(segm.uncompressedSize)
                do {
                    return dbPut(refs: [blobId], data: try fileInfo.toBytes(), importSize: importSize, ctx).map { id in
                        encodeNextStep(for: id)
                    }
                } catch {
                    return loop.makeFailedFuture(error)
                }
              }

              let containsRequestWireSizeEstimate = 64

              let throttledContainsFuture = self.execute(on: self.netQueue, size: containsRequestWireSizeEstimate, default: .skipped) { () -> LLBFuture<NextStep> in
                let containsFuture = self.dbContains(segm, ctx)
                let containsLoop = containsFuture.eventLoop
                return containsFuture.flatMap { exists -> LLBFuture<NextStep> in

                  guard !exists else {
                    let existingIdFuture: LLBFuture<NextStep> = segm.id.flatMap { id in
                        return uploadFileInfo(blobId: id, importSize: segm.uncompressedSize)
                    }
                    existingIdFuture.cascade(to: partialStepPromise)
                    return existingIdFuture
                  }

                  return containsLoop.makeSucceededFuture(NextStep.execute(in: .UploadingFiles, run: {
                    let nextStepFuture: LLBFuture<NextStep> = self.execute(on: self.cpuQueue, default: nil) { () -> LLBFuture<NextStep>? in
                        let data: LLBFastData

                        switch importObject {
                        case let .link(target):
                            data = target
                        case let .file(file, _):
                            data = try self.prepareSingleSegment(of: file, segmentNumber: segmentOffset, useCompression: segm.isCompressed)
                        }

                        let slice = data.toByteBuffer()

                        // Make sure we want until the netQueue is sufficiently
                        // free to take our load. This ensures that we're not
                        // limited by CPU parallelism for network concurrency.
                        return self.executeWithBackpressure(on: self.netQueue, loop: containsLoop, size: slice.readableBytes, default: .skipped) { () -> LLBFuture<NextStep> in
                            return self.dbPut(refs: [], data: slice, importSize: segm.uncompressedSize, ctx).flatMap { id -> LLBFuture<NextStep> in
                                withExtendedLifetime(importObject) { // for mmap
                                    uploadFileInfo(blobId: id)
                                }
                            }.map { result in
                                return result
                            }.hop(to: loop)
                        }
                    }.flatMap {
                        // This type of return ensures that cpuQueue does not
                        // wait for the netQueue operation to complete.
                        $0 ?? loop.makeSucceededFuture(NextStep.skipped)
                    }
                    nextStepFuture.cascade(to: partialStepPromise)
                    return nextStepFuture
                  }))
                }
              }

              return (nextStepFuture: throttledContainsFuture, partialStepFuture: partialStepPromise.futureResult)
          }

          let cheapNextStepFutures: [LLBFuture<NextStep>] = cheapNextAndPartialStepFutures.map { $0.nextStepFuture }

          // Sending a single segment, for which we don't need
          // to wait until all of its subcomponents are uploaded.
          guard isSingleChunk == false else {
            return cheapNextStepFutures
          }

          let partialStepFutures = cheapNextAndPartialStepFutures.map { $0.partialStepFuture }
          let combinePartialResultsStep: NextStep = NextStep.execute(in: .UploadingWait, run: {
            return self.whenAllSucceed(partialStepFutures).flatMapErrorThrowing { error -> [NextStep] in
                // If ready any segment fails with something, we either forward
                // the error or hide it.

                if error is FileSystemError {
                    // Some kind of filesystem access error.
                    if self.options.skipUnreadable {
                        return []
                    }
                } else if let fsError = error as? FileSegmenter.Error {
                    if case .resourceChanged = fsError {
                        if self.options.relaxConsistencyChecks {
                            // Turn consistency checks errors into skips.
                            // Location /EC1.
                            return []
                        }
                    }
                } else if error is FileSystemError {
                    if self.options.skipUnreadable {
                        // Not a consistency error, hide it.
                        return []
                    }
                }

                throw error

              }.flatMap { nextSteps in
                // The next steps can only be empty if we've reacting on
                // a filesystem-related error with some of our chunks.
                guard nextSteps.isEmpty == false else {
                    return loop.makeSucceededFuture(.skipped)
                }

                let chunkIds: [LLBDataID] = nextSteps.map {
                    guard case let .partialFileChunk(id) = $0 else {
                        preconditionFailure("Next step is not a partial chunk")
                    }
                    return id
                }

                var fileInfo = LLBFileInfo()
                fileInfo.type = type
                fileInfo.size = UInt64(allSegmentsUncompressedDataSize)
                // The top is not compressed when chunks are present.
                fileInfo.compression = .none
                fileInfo.fixedChunkSize = UInt64(chunkIds.count > 1 ? self.options.fileChunkSize : allSegmentsUncompressedDataSize)
                let posixDetails = importObject.posixDetails
                fileInfo.update(posixDetails: posixDetails, options: self.options)
                do {
                    let fileInfoBytes = try fileInfo.toBytes()
                    return self.execute(on: self.netQueue, size: fileInfoBytes.readableBytes, default: .skipped) {
                        self.dbPut(refs: chunkIds, data: fileInfoBytes, importSize: nil, ctx).map { id in
                            return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: allSegmentsUncompressedDataSize), posixDetails: posixDetails))
                        }
                    }
                } catch {
                    return loop.makeFailedFuture(error)
                }
              }
            })

          // Since uploading fragmented files requires waiting and churning
          // the recursive next step machinery until the parts are properly
          // uploaded, we can't just block on waiting until all the chunks
          // have been uploaded. Therefore we wait for the huge files in
          // its own state, .UploadingWait.
          return cheapNextStepFutures + [loop.makeSucceededFuture(combinePartialResultsStep)]
        }