func run()

in Sources/TSFCASFileTree/FileTreeImport.swift [463:654]


    func run(_ ctx: Context) -> LLBFuture<LLBDataID> {
        let loop = self.loop
        let importPath = self.importPath
        let stats = self.stats

        ssdQueue.execute({ () -> ConcurrentFilesystemScanner in
            self.set(phase: .AssemblingPaths)
            return try ConcurrentFilesystemScanner(importPath, pathFilter: self.makePathFilter())
        }).map { scanner -> [LLBFuture<[ConcurrentFilesystemScanner.Element]>] in
            if TSCBasic.localFileSystem.isFile(importPath) {
                // We can import individual files just fine.
                _ = stats.toImportObjects_.add(1)
                return [loop.makeSucceededFuture([(importPath, .REG)])]
            } else {
                // Scan the filesystem tree using multiple threads.
                return (0..<self.ssdQueue.maxOpCount).map { _ in
                  self.execute(on: self.ssdQueue, default: []) { () -> [ConcurrentFilesystemScanner.Element] in
                    // Gather all the paths up front.
                    var pathInfos = [ConcurrentFilesystemScanner.Element]()
                    for pathInfo in scanner {
                        pathInfos.append(pathInfo)
                        _ = stats.toImportObjects_.add(1)
                    }
                    return pathInfos
                  }
                }
            }
        }.flatMap { pathsFutures -> LLBFuture<[[ConcurrentFilesystemScanner.Element]]> in
            self.whenAllSucceed(pathsFutures)
        }.map { (pathInfos: [[ConcurrentFilesystemScanner.Element]]) -> [LLBFuture<NextStep>] in
            self.set(phase: .EstimatingSize)

            // Immediately slurp/verify the blobs.
            return pathInfos.joined().map { pathInfo -> LLBFuture<NextStep> in
                self.execute(on: self.ssdQueue, default: .skipped) {
                    do {
                        switch try self.makeNextStep(path: pathInfo.path, type: pathInfo.type, ctx) {
                        case let .execute(in: .EstimatingSize, run):
                            return NextStep.wait(in: .EstimatingSize, futures: [run()])
                        case let step:
                            return step
                        }
                    } catch {
                        _ = self.finalResultPromise.fail(error)
                        throw error
                    }
                }
            }
        }.flatMap { nextStepFutures -> LLBFuture<[NextStep]> in
            self.whenAllSucceed(nextStepFutures)
        }.flatMap { nextStepFutures -> LLBFuture<[NextStep]> in
            self.set(phase: .CheckIfUploaded)
            return self.recursivelyPerformSteps(currentPhase: .CheckIfUploaded, currentPhaseSteps: nextStepFutures)
        }.map { nextSteps -> (directoryPaths: [(AbsolutePath, LLBPosixFileDetails?)], completeFiles: [AbsolutePath: SingleFileInfo]) in
            self.set(phase: .UploadingDirs)
            var completeFiles = [AbsolutePath: SingleFileInfo]()
            var directoryPaths = [(AbsolutePath, LLBPosixFileDetails?)]()
            for step in nextSteps {
                switch step {
                case .skipped, .partialFileChunk:
                    continue
                case let .gotDirectory(path, posixDetails):
                    directoryPaths.append((path, posixDetails))
                case .singleFile(let info):
                    completeFiles[info.path] = info
                case .execute, .wait:
                    fatalError("Impossible step: \(step)")
                }
            }

            return (directoryPaths, completeFiles)
        }.flatMap { args -> LLBFuture<LLBDataID> in
            // Account for the importPath which we add here last.
            let directoryPaths = args.directoryPaths.sorted(by: { $1.0 < $0.0 })
            let completeFiles = args.completeFiles

            /// If imported a single file, return it.
            if directoryPaths.isEmpty,
               let (_, firstFile) = completeFiles.first, completeFiles.count == 1 {
                self.set(phase: .ImportSucceeded)
                return loop.makeSucceededFuture(firstFile.id)
            }

            let udpLock = NIOConcurrencyHelpers.Lock()
            var uploadedDirectoryPaths_ = [AbsolutePath: LLBFuture<(LLBDataID, LLBDirectoryEntry)?>]()

            // Now we have to add all the directories; we do so serially and in
            // reverse order of depth, so we can guarantee the children are resolved
            // when they need to be.
            let dirFutures: [LLBFuture<Void>] = directoryPaths.map { arguments in
              let (path, pathPosixDetails) = arguments
              let dirLoop = self._db.group.next()
              let directoryPromise: LLBPromise<(LLBDataID, LLBDirectoryEntry)?>
              directoryPromise = dirLoop.makePromise()
              udpLock.withLockVoid {
                uploadedDirectoryPaths_[path] = directoryPromise.futureResult
              }

              let dirFuture: LLBFuture<(LLBDataID, LLBDirectoryEntry)?>
              dirFuture = self.execute(on: self.netQueue, loop: dirLoop, size: 1024, default: nil) {
                // Get the list of all subpaths.
                let directoryListing: [String]
                do {
                    directoryListing = try TSCBasic.localFileSystem.getDirectoryContents(path).sorted()
                } catch {
                    if self.options.skipUnreadable {
                        return dirLoop.makeSucceededFuture(nil)
                    }
                    return dirLoop.makeFailedFuture(ImportError.unreadableDirectory(path))
                }

                // Build the finalized directory file list.
                let subpathsFutures: [LLBFuture<(LLBDataID, LLBDirectoryEntry)?>]
                subpathsFutures = directoryListing.compactMap { filename -> LLBFuture<(LLBDataID, LLBDirectoryEntry)?> in
                    let subpath = path.appending(component: filename)

                    if let info = completeFiles[subpath] {
                        var dirEntry = LLBDirectoryEntry()
                        dirEntry.name = filename
                        dirEntry.type = info.type
                        dirEntry.size = info.size
                        dirEntry.update(posixDetails: info.posixDetails, options: self.options)
                        return dirLoop.makeSucceededFuture((info.id, dirEntry))
                    } else if let dirInfoFuture = udpLock.withLock({uploadedDirectoryPaths_[subpath]}) {
                        return dirInfoFuture.map { idInfo in
                            guard let (id, info) = idInfo else { return nil }
                            var dirEntry = LLBDirectoryEntry()
                            dirEntry.name = filename
                            dirEntry.type = info.type
                            dirEntry.size = info.size
                            dirEntry.update(posixDetails: info.posixDetails, options: self.options)
                            return (id, dirEntry)
                        }
                    } else {
                        return dirLoop.makeSucceededFuture(nil)
                    }
                }

                return self.whenAllSucceed(subpathsFutures, on: dirLoop).flatMap { subpaths in
                    do {
                        let (refs, dirData, aggregateSize) = try self.constructDirectoryContents(subpaths, wireFormat: self.options.wireFormat)

                        _ = stats.toImportBytes_.add(dirData.readableBytes)
                        return self.dbPut(refs: refs, data: dirData, importSize: dirData.readableBytes, ctx).map { id in
                            _ = stats.uploadedMetadataBytes_.add(dirData.readableBytes)
                            var dirEntry = LLBDirectoryEntry()
                            dirEntry.name = path.pathString
                            dirEntry.type = .directory
                            dirEntry.size = aggregateSize
                            if let pd = pathPosixDetails {
                                dirEntry.update(posixDetails: pd, options: self.options)
                            }
                            return (id, dirEntry)
                        }
                    } catch {
                        return loop.makeFailedFuture(error)
                    }
                }
              }
              dirFuture.cascade(to: directoryPromise)
              return dirFuture.map({ _ in () })
            }

            guard let topDirFuture = udpLock.withLock({uploadedDirectoryPaths_[importPath]}) else {
                return loop.makeFailedFuture(ImportError.unreadableDirectory(importPath))
            }

            return self.whenAllSucceed(dirFutures).flatMap { _ -> LLBFuture<LLBDataID> in
                return topDirFuture.flatMapThrowing { idInfo -> LLBDataID in
                    guard let (id, info) = idInfo else {
                        throw ImportError.unreadableDirectory(importPath)
                    }
                    if self.options.pathFilter != nil {
                        assert(stats.importedBytes - stats.uploadedMetadataBytes == info.size, "bytesImported: \(stats.importedBytes) != aggregateSize: \(info.size)")
                    }
                    self.set(phase: .ImportSucceeded)
                    return id
                }
            }
        }.flatMapErrorThrowing { error in
            _ = self.finalResultPromise.fail(error)
            throw error
        }.cascade(to: finalResultPromise)

        return finalResultPromise.futureResult.flatMapErrorThrowing { error in
            stats.phase = .ImportFailed
            throw error
        }.map { result in
            stats.phase = .ImportSucceeded
            return result
        }
    }