in Sources/TSFCASFileTree/FileTreeImport.swift [463:654]
func run(_ ctx: Context) -> LLBFuture<LLBDataID> {
let loop = self.loop
let importPath = self.importPath
let stats = self.stats
ssdQueue.execute({ () -> ConcurrentFilesystemScanner in
self.set(phase: .AssemblingPaths)
return try ConcurrentFilesystemScanner(importPath, pathFilter: self.makePathFilter())
}).map { scanner -> [LLBFuture<[ConcurrentFilesystemScanner.Element]>] in
if TSCBasic.localFileSystem.isFile(importPath) {
// We can import individual files just fine.
_ = stats.toImportObjects_.add(1)
return [loop.makeSucceededFuture([(importPath, .REG)])]
} else {
// Scan the filesystem tree using multiple threads.
return (0..<self.ssdQueue.maxOpCount).map { _ in
self.execute(on: self.ssdQueue, default: []) { () -> [ConcurrentFilesystemScanner.Element] in
// Gather all the paths up front.
var pathInfos = [ConcurrentFilesystemScanner.Element]()
for pathInfo in scanner {
pathInfos.append(pathInfo)
_ = stats.toImportObjects_.add(1)
}
return pathInfos
}
}
}
}.flatMap { pathsFutures -> LLBFuture<[[ConcurrentFilesystemScanner.Element]]> in
self.whenAllSucceed(pathsFutures)
}.map { (pathInfos: [[ConcurrentFilesystemScanner.Element]]) -> [LLBFuture<NextStep>] in
self.set(phase: .EstimatingSize)
// Immediately slurp/verify the blobs.
return pathInfos.joined().map { pathInfo -> LLBFuture<NextStep> in
self.execute(on: self.ssdQueue, default: .skipped) {
do {
switch try self.makeNextStep(path: pathInfo.path, type: pathInfo.type, ctx) {
case let .execute(in: .EstimatingSize, run):
return NextStep.wait(in: .EstimatingSize, futures: [run()])
case let step:
return step
}
} catch {
_ = self.finalResultPromise.fail(error)
throw error
}
}
}
}.flatMap { nextStepFutures -> LLBFuture<[NextStep]> in
self.whenAllSucceed(nextStepFutures)
}.flatMap { nextStepFutures -> LLBFuture<[NextStep]> in
self.set(phase: .CheckIfUploaded)
return self.recursivelyPerformSteps(currentPhase: .CheckIfUploaded, currentPhaseSteps: nextStepFutures)
}.map { nextSteps -> (directoryPaths: [(AbsolutePath, LLBPosixFileDetails?)], completeFiles: [AbsolutePath: SingleFileInfo]) in
self.set(phase: .UploadingDirs)
var completeFiles = [AbsolutePath: SingleFileInfo]()
var directoryPaths = [(AbsolutePath, LLBPosixFileDetails?)]()
for step in nextSteps {
switch step {
case .skipped, .partialFileChunk:
continue
case let .gotDirectory(path, posixDetails):
directoryPaths.append((path, posixDetails))
case .singleFile(let info):
completeFiles[info.path] = info
case .execute, .wait:
fatalError("Impossible step: \(step)")
}
}
return (directoryPaths, completeFiles)
}.flatMap { args -> LLBFuture<LLBDataID> in
// Account for the importPath which we add here last.
let directoryPaths = args.directoryPaths.sorted(by: { $1.0 < $0.0 })
let completeFiles = args.completeFiles
/// If imported a single file, return it.
if directoryPaths.isEmpty,
let (_, firstFile) = completeFiles.first, completeFiles.count == 1 {
self.set(phase: .ImportSucceeded)
return loop.makeSucceededFuture(firstFile.id)
}
let udpLock = NIOConcurrencyHelpers.Lock()
var uploadedDirectoryPaths_ = [AbsolutePath: LLBFuture<(LLBDataID, LLBDirectoryEntry)?>]()
// Now we have to add all the directories; we do so serially and in
// reverse order of depth, so we can guarantee the children are resolved
// when they need to be.
let dirFutures: [LLBFuture<Void>] = directoryPaths.map { arguments in
let (path, pathPosixDetails) = arguments
let dirLoop = self._db.group.next()
let directoryPromise: LLBPromise<(LLBDataID, LLBDirectoryEntry)?>
directoryPromise = dirLoop.makePromise()
udpLock.withLockVoid {
uploadedDirectoryPaths_[path] = directoryPromise.futureResult
}
let dirFuture: LLBFuture<(LLBDataID, LLBDirectoryEntry)?>
dirFuture = self.execute(on: self.netQueue, loop: dirLoop, size: 1024, default: nil) {
// Get the list of all subpaths.
let directoryListing: [String]
do {
directoryListing = try TSCBasic.localFileSystem.getDirectoryContents(path).sorted()
} catch {
if self.options.skipUnreadable {
return dirLoop.makeSucceededFuture(nil)
}
return dirLoop.makeFailedFuture(ImportError.unreadableDirectory(path))
}
// Build the finalized directory file list.
let subpathsFutures: [LLBFuture<(LLBDataID, LLBDirectoryEntry)?>]
subpathsFutures = directoryListing.compactMap { filename -> LLBFuture<(LLBDataID, LLBDirectoryEntry)?> in
let subpath = path.appending(component: filename)
if let info = completeFiles[subpath] {
var dirEntry = LLBDirectoryEntry()
dirEntry.name = filename
dirEntry.type = info.type
dirEntry.size = info.size
dirEntry.update(posixDetails: info.posixDetails, options: self.options)
return dirLoop.makeSucceededFuture((info.id, dirEntry))
} else if let dirInfoFuture = udpLock.withLock({uploadedDirectoryPaths_[subpath]}) {
return dirInfoFuture.map { idInfo in
guard let (id, info) = idInfo else { return nil }
var dirEntry = LLBDirectoryEntry()
dirEntry.name = filename
dirEntry.type = info.type
dirEntry.size = info.size
dirEntry.update(posixDetails: info.posixDetails, options: self.options)
return (id, dirEntry)
}
} else {
return dirLoop.makeSucceededFuture(nil)
}
}
return self.whenAllSucceed(subpathsFutures, on: dirLoop).flatMap { subpaths in
do {
let (refs, dirData, aggregateSize) = try self.constructDirectoryContents(subpaths, wireFormat: self.options.wireFormat)
_ = stats.toImportBytes_.add(dirData.readableBytes)
return self.dbPut(refs: refs, data: dirData, importSize: dirData.readableBytes, ctx).map { id in
_ = stats.uploadedMetadataBytes_.add(dirData.readableBytes)
var dirEntry = LLBDirectoryEntry()
dirEntry.name = path.pathString
dirEntry.type = .directory
dirEntry.size = aggregateSize
if let pd = pathPosixDetails {
dirEntry.update(posixDetails: pd, options: self.options)
}
return (id, dirEntry)
}
} catch {
return loop.makeFailedFuture(error)
}
}
}
dirFuture.cascade(to: directoryPromise)
return dirFuture.map({ _ in () })
}
guard let topDirFuture = udpLock.withLock({uploadedDirectoryPaths_[importPath]}) else {
return loop.makeFailedFuture(ImportError.unreadableDirectory(importPath))
}
return self.whenAllSucceed(dirFutures).flatMap { _ -> LLBFuture<LLBDataID> in
return topDirFuture.flatMapThrowing { idInfo -> LLBDataID in
guard let (id, info) = idInfo else {
throw ImportError.unreadableDirectory(importPath)
}
if self.options.pathFilter != nil {
assert(stats.importedBytes - stats.uploadedMetadataBytes == info.size, "bytesImported: \(stats.importedBytes) != aggregateSize: \(info.size)")
}
self.set(phase: .ImportSucceeded)
return id
}
}
}.flatMapErrorThrowing { error in
_ = self.finalResultPromise.fail(error)
throw error
}.cascade(to: finalResultPromise)
return finalResultPromise.futureResult.flatMapErrorThrowing { error in
stats.phase = .ImportFailed
throw error
}.map { result in
stats.phase = .ImportSucceeded
return result
}
}