in Sources/TSFCASFileTree/FileTreeImport.swift [842:1136]
func makeNextStep(path: AbsolutePath, type pathObjectType: FilesystemObjectType, _ ctx: Context) throws -> NextStep {
let loop = self.loop
let stats = self.stats
let segmentDescriptors: [SegmentDescriptor] // Information about segments of file, possibly after compression.
let type: LLBFileType
let allSegmentsUncompressedDataSize: Int
enum ObjectToImport {
case link(target: LLBFastData)
case file(file: FileSegmenter, posixDetails: LLBPosixFileDetails)
var posixDetails: LLBPosixFileDetails {
switch self {
case .link:
return LLBPosixFileDetails()
case .file(_, let posixDetails):
return posixDetails
}
}
}
let importObject: ObjectToImport
func relative(_ path: AbsolutePath) -> String {
return path.prettyPath(cwd: importPath)
}
// If this is a symbolic link, the "data" is the target.
switch pathObjectType {
case .LNK:
var buf = [Int8](repeating: 0, count: Int(PATH_MAX) + 1)
let count = TSCLibc.readlink(path.pathString, &buf, buf.count - 1)
guard count > 0 else {
throw ImportError.unreadableLink(path)
}
type = .symlink
let target = LLBFastData(buf[..<count].map{ UInt8($0) })
allSegmentsUncompressedDataSize = target.count
importObject = .link(target: target)
segmentDescriptors = [SegmentDescriptor(isCompressed: false, uncompressedSize: target.count, id: _db.identify(refs: [], data: target.toByteBuffer(), ctx))]
case .DIR:
let posixDetails: LLBPosixFileDetails?
// Read the permissions and ownership information, if requested.
if options.preservePosixDetails.preservationEnabled {
var sb = stat()
if lstat(path.pathString, &sb) == 0, (sb.st_mode & S_IFMT) == S_IFDIR {
posixDetails = LLBPosixFileDetails(from: sb)
} else {
posixDetails = nil
}
} else {
posixDetails = nil
}
// If this is a directory, defer its processing.
return .gotDirectory(path: path, posixDetails: posixDetails)
case .REG:
let file: FileSegmenter
do {
file = try FileSegmenter(importPath: importPath, path, segmentSize: options.fileChunkSize, minMmapSize: options.minMmapSize, allowInconsistency: options.relaxConsistencyChecks)
} catch {
guard options.skipUnreadable else {
throw ImportError.unreadableFile(path)
}
return .skipped
}
type = (file.statInfo.st_mode & 0o111 == 0) ? .plainFile : .executable
importObject = .file(file: file, posixDetails: LLBPosixFileDetails(from: file.statInfo))
segmentDescriptors = try describeAllSegments(of: file, ctx)
allSegmentsUncompressedDataSize = segmentDescriptors.reduce(0) {
$0 + $1.uncompressedSize
}
default:
return .skipped
}
// Add the rest of the chunks to the number of objects to import.
_ = stats.toImportObjects_.add(max(0, allSegmentsUncompressedDataSize - 1) / options.fileChunkSize)
_ = stats.toImportBytes_.add(allSegmentsUncompressedDataSize)
_ = stats.toImportFiles_.add(1)
// We check if the remote contains the object before ingesting.
//
// FIXME: This feels like it should be automatically handled by
// the database, not here. For now, the RemoteCASDatabase isn't
// doing this, though, so this is important for avoiding
// unnecessary uploads.
// FIXME: Double-scanning of files on disk.
// Whether the file is stored as a single chunk.
let isSingleChunk = segmentDescriptors.count == 1
// Whether the current object is top level of the whole tree.
// Can happen if the file is a sole object to import.
let topLevel = isSingleChunk && importPath == path
func assemblePartialNextSteps() -> [LLBFuture<NextStep>] {
let cheapNextAndPartialStepFutures: [(nextStepFuture: LLBFuture<NextStep>, partialStepFuture: LLBFuture<NextStep>)] = segmentDescriptors.enumerated().map { (segmentOffset, segm) in
// This partial step is a final step for the sequence
// of cheap/heavy steps, for a single segment (chunk).
// We use cancellable promise since our heavy next steps might
// not even run at all.
let partialStepPromise = LLBCancellablePromise(promise: loop.makePromise(of: NextStep.self))
func encodeNextStep(for id: LLBDataID) -> NextStep {
if isSingleChunk {
return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: segm.uncompressedSize), posixDetails: importObject.posixDetails))
} else {
return .partialFileChunk(id)
}
}
// If the file has non-standard layout, upload it after we upload
// the binary blob.
func uploadFileInfo(blobId: LLBDataID, importSize: Int? = nil) -> LLBFuture<NextStep> {
guard finalResultPromise.isCompleted == false else {
return loop.makeSucceededFuture(NextStep.skipped)
}
// We need to wrap the blob in a `FileInformation`:
// — When we compress the file.
// — When the file is top level (importing a single file).
guard segm.isCompressed || topLevel else {
if let size = importSize {
_ = stats.importedObjects_.add(1)
_ = stats.importedBytes_.add(size)
}
return loop.makeSucceededFuture(encodeNextStep(for: blobId))
}
var fileInfo = LLBFileInfo()
// Each segment (if not a single segment) is encoded as a plain
// file and doesn't have any other metadata (e.g. permissions).
if isSingleChunk {
fileInfo.type = type
fileInfo.update(posixDetails: importObject.posixDetails, options: self.options)
} else {
fileInfo.type = .plainFile
}
fileInfo.size = UInt64(segm.uncompressedSize)
// FIXME: no compression supported right now
// fileInfo.compression = segm.isCompressed ? ... : .none
assert(!segm.isCompressed)
fileInfo.compression = .none
fileInfo.fixedChunkSize = UInt64(segm.uncompressedSize)
do {
return dbPut(refs: [blobId], data: try fileInfo.toBytes(), importSize: importSize, ctx).map { id in
encodeNextStep(for: id)
}
} catch {
return loop.makeFailedFuture(error)
}
}
let containsRequestWireSizeEstimate = 64
let throttledContainsFuture = self.execute(on: self.netQueue, size: containsRequestWireSizeEstimate, default: .skipped) { () -> LLBFuture<NextStep> in
let containsFuture = self.dbContains(segm, ctx)
let containsLoop = containsFuture.eventLoop
return containsFuture.flatMap { exists -> LLBFuture<NextStep> in
guard !exists else {
let existingIdFuture: LLBFuture<NextStep> = segm.id.flatMap { id in
return uploadFileInfo(blobId: id, importSize: segm.uncompressedSize)
}
existingIdFuture.cascade(to: partialStepPromise)
return existingIdFuture
}
return containsLoop.makeSucceededFuture(NextStep.execute(in: .UploadingFiles, run: {
let nextStepFuture: LLBFuture<NextStep> = self.execute(on: self.cpuQueue, default: nil) { () -> LLBFuture<NextStep>? in
let data: LLBFastData
switch importObject {
case let .link(target):
data = target
case let .file(file, _):
data = try self.prepareSingleSegment(of: file, segmentNumber: segmentOffset, useCompression: segm.isCompressed)
}
let slice = data.toByteBuffer()
// Make sure we want until the netQueue is sufficiently
// free to take our load. This ensures that we're not
// limited by CPU parallelism for network concurrency.
return self.executeWithBackpressure(on: self.netQueue, loop: containsLoop, size: slice.readableBytes, default: .skipped) { () -> LLBFuture<NextStep> in
return self.dbPut(refs: [], data: slice, importSize: segm.uncompressedSize, ctx).flatMap { id -> LLBFuture<NextStep> in
withExtendedLifetime(importObject) { // for mmap
uploadFileInfo(blobId: id)
}
}.map { result in
return result
}.hop(to: loop)
}
}.flatMap {
// This type of return ensures that cpuQueue does not
// wait for the netQueue operation to complete.
$0 ?? loop.makeSucceededFuture(NextStep.skipped)
}
nextStepFuture.cascade(to: partialStepPromise)
return nextStepFuture
}))
}
}
return (nextStepFuture: throttledContainsFuture, partialStepFuture: partialStepPromise.futureResult)
}
let cheapNextStepFutures: [LLBFuture<NextStep>] = cheapNextAndPartialStepFutures.map { $0.nextStepFuture }
// Sending a single segment, for which we don't need
// to wait until all of its subcomponents are uploaded.
guard isSingleChunk == false else {
return cheapNextStepFutures
}
let partialStepFutures = cheapNextAndPartialStepFutures.map { $0.partialStepFuture }
let combinePartialResultsStep: NextStep = NextStep.execute(in: .UploadingWait, run: {
return self.whenAllSucceed(partialStepFutures).flatMapErrorThrowing { error -> [NextStep] in
// If ready any segment fails with something, we either forward
// the error or hide it.
if error is FileSystemError {
// Some kind of filesystem access error.
if self.options.skipUnreadable {
return []
}
} else if let fsError = error as? FileSegmenter.Error {
if case .resourceChanged = fsError {
if self.options.relaxConsistencyChecks {
// Turn consistency checks errors into skips.
// Location /EC1.
return []
}
}
} else if error is FileSystemError {
if self.options.skipUnreadable {
// Not a consistency error, hide it.
return []
}
}
throw error
}.flatMap { nextSteps in
// The next steps can only be empty if we've reacting on
// a filesystem-related error with some of our chunks.
guard nextSteps.isEmpty == false else {
return loop.makeSucceededFuture(.skipped)
}
let chunkIds: [LLBDataID] = nextSteps.map {
guard case let .partialFileChunk(id) = $0 else {
preconditionFailure("Next step is not a partial chunk")
}
return id
}
var fileInfo = LLBFileInfo()
fileInfo.type = type
fileInfo.size = UInt64(allSegmentsUncompressedDataSize)
// The top is not compressed when chunks are present.
fileInfo.compression = .none
fileInfo.fixedChunkSize = UInt64(chunkIds.count > 1 ? self.options.fileChunkSize : allSegmentsUncompressedDataSize)
let posixDetails = importObject.posixDetails
fileInfo.update(posixDetails: posixDetails, options: self.options)
do {
let fileInfoBytes = try fileInfo.toBytes()
return self.execute(on: self.netQueue, size: fileInfoBytes.readableBytes, default: .skipped) {
self.dbPut(refs: chunkIds, data: fileInfoBytes, importSize: nil, ctx).map { id in
return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: allSegmentsUncompressedDataSize), posixDetails: posixDetails))
}
}
} catch {
return loop.makeFailedFuture(error)
}
}
})
// Since uploading fragmented files requires waiting and churning
// the recursive next step machinery until the parts are properly
// uploaded, we can't just block on waiting until all the chunks
// have been uploaded. Therefore we wait for the huge files in
// its own state, .UploadingWait.
return cheapNextStepFutures + [loop.makeSucceededFuture(combinePartialResultsStep)]
}
// When the .EstimatingSize phase comes, this can be executed.
return .execute(in: .EstimatingSize, run: {
loop.makeSucceededFuture(NextStep.wait(in: .CheckIfUploaded,
futures: assemblePartialNextSteps()))
})
}