func makeNextStep()

in Sources/TSFCASFileTree/FileTreeImport.swift [842:1136]


    func makeNextStep(path: AbsolutePath, type pathObjectType: FilesystemObjectType, _ ctx: Context) throws -> NextStep {
        let loop = self.loop
        let stats = self.stats

        let segmentDescriptors: [SegmentDescriptor]  // Information about segments of file, possibly after compression.
        let type: LLBFileType
        let allSegmentsUncompressedDataSize: Int
        enum ObjectToImport {
            case link(target: LLBFastData)
            case file(file: FileSegmenter, posixDetails: LLBPosixFileDetails)
            var posixDetails: LLBPosixFileDetails {
                switch self {
                case .link:
                    return LLBPosixFileDetails()
                case .file(_, let posixDetails):
                    return posixDetails
                }
            }
        }
        let importObject: ObjectToImport

        func relative(_ path: AbsolutePath) -> String {
            return path.prettyPath(cwd: importPath)
        }

        // If this is a symbolic link, the "data" is the target.
        switch pathObjectType {
        case .LNK:
            var buf = [Int8](repeating: 0, count: Int(PATH_MAX) + 1)
            let count = TSCLibc.readlink(path.pathString, &buf, buf.count - 1)
            guard count > 0 else {
                throw ImportError.unreadableLink(path)
            }
            type = .symlink

            let target = LLBFastData(buf[..<count].map{ UInt8($0) })
            allSegmentsUncompressedDataSize = target.count
            importObject = .link(target: target)
            segmentDescriptors = [SegmentDescriptor(isCompressed: false, uncompressedSize: target.count, id: _db.identify(refs: [], data: target.toByteBuffer(), ctx))]
        case .DIR:
            let posixDetails: LLBPosixFileDetails?

            // Read the permissions and ownership information, if requested.
            if options.preservePosixDetails.preservationEnabled {
                var sb = stat()
                if lstat(path.pathString, &sb) == 0, (sb.st_mode & S_IFMT) == S_IFDIR {
                    posixDetails = LLBPosixFileDetails(from: sb)
                } else {
                    posixDetails = nil
                }
            } else {
                posixDetails = nil
            }

            // If this is a directory, defer its processing.
            return .gotDirectory(path: path, posixDetails: posixDetails)
        case .REG:
            let file: FileSegmenter
            do {
                file = try FileSegmenter(importPath: importPath, path, segmentSize: options.fileChunkSize, minMmapSize: options.minMmapSize, allowInconsistency: options.relaxConsistencyChecks)
            } catch {
                guard options.skipUnreadable else {
                    throw ImportError.unreadableFile(path)
                }
                return .skipped
            }

            type = (file.statInfo.st_mode & 0o111 == 0) ? .plainFile : .executable
            importObject = .file(file: file, posixDetails: LLBPosixFileDetails(from: file.statInfo))
            segmentDescriptors = try describeAllSegments(of: file, ctx)
            allSegmentsUncompressedDataSize = segmentDescriptors.reduce(0) {
                $0 + $1.uncompressedSize
            }
        default:
            return .skipped
        }

        // Add the rest of the chunks to the number of objects to import.
        _ = stats.toImportObjects_.add(max(0, allSegmentsUncompressedDataSize - 1) / options.fileChunkSize)
        _ = stats.toImportBytes_.add(allSegmentsUncompressedDataSize)
        _ = stats.toImportFiles_.add(1)

        // We check if the remote contains the object before ingesting.
        //
        // FIXME: This feels like it should be automatically handled by
        // the database, not here. For now, the RemoteCASDatabase isn't
        // doing this, though, so this is important for avoiding
        // unnecessary uploads.
        // FIXME: Double-scanning of files on disk.

        // Whether the file is stored as a single chunk.
        let isSingleChunk = segmentDescriptors.count == 1

        // Whether the current object is top level of the whole tree.
        // Can happen if the file is a sole object to import.
        let topLevel = isSingleChunk && importPath == path

        func assemblePartialNextSteps() -> [LLBFuture<NextStep>] {
          let cheapNextAndPartialStepFutures: [(nextStepFuture: LLBFuture<NextStep>, partialStepFuture: LLBFuture<NextStep>)] = segmentDescriptors.enumerated().map { (segmentOffset, segm) in

              // This partial step is a final step for the sequence
              // of cheap/heavy steps, for a single segment (chunk).
              // We use cancellable promise since our heavy next steps might
              // not even run at all.
              let partialStepPromise = LLBCancellablePromise(promise: loop.makePromise(of: NextStep.self))

              func encodeNextStep(for id: LLBDataID) -> NextStep {
                if isSingleChunk {
                    return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: segm.uncompressedSize), posixDetails: importObject.posixDetails))
                } else {
                    return .partialFileChunk(id)
                }
              }

              // If the file has non-standard layout, upload it after we upload
              // the binary blob.
              func uploadFileInfo(blobId: LLBDataID, importSize: Int? = nil) -> LLBFuture<NextStep> {
                guard finalResultPromise.isCompleted == false else {
                    return loop.makeSucceededFuture(NextStep.skipped)
                }

                // We need to wrap the blob in a `FileInformation`:
                // — When we compress the file.
                // — When the file is top level (importing a single file).
                guard segm.isCompressed || topLevel else {
                    if let size = importSize {
                        _ = stats.importedObjects_.add(1)
                        _ = stats.importedBytes_.add(size)
                    }
                    return loop.makeSucceededFuture(encodeNextStep(for: blobId))
                }

                var fileInfo = LLBFileInfo()
                // Each segment (if not a single segment) is encoded as a plain
                // file and doesn't have any other metadata (e.g. permissions).
                if isSingleChunk {
                    fileInfo.type = type
                    fileInfo.update(posixDetails: importObject.posixDetails, options: self.options)
                } else {
                    fileInfo.type = .plainFile
                }
                fileInfo.size = UInt64(segm.uncompressedSize)
                // FIXME: no compression supported right now
                // fileInfo.compression = segm.isCompressed ? ... : .none
                assert(!segm.isCompressed)
                fileInfo.compression = .none
                fileInfo.fixedChunkSize = UInt64(segm.uncompressedSize)
                do {
                    return dbPut(refs: [blobId], data: try fileInfo.toBytes(), importSize: importSize, ctx).map { id in
                        encodeNextStep(for: id)
                    }
                } catch {
                    return loop.makeFailedFuture(error)
                }
              }

              let containsRequestWireSizeEstimate = 64

              let throttledContainsFuture = self.execute(on: self.netQueue, size: containsRequestWireSizeEstimate, default: .skipped) { () -> LLBFuture<NextStep> in
                let containsFuture = self.dbContains(segm, ctx)
                let containsLoop = containsFuture.eventLoop
                return containsFuture.flatMap { exists -> LLBFuture<NextStep> in

                  guard !exists else {
                    let existingIdFuture: LLBFuture<NextStep> = segm.id.flatMap { id in
                        return uploadFileInfo(blobId: id, importSize: segm.uncompressedSize)
                    }
                    existingIdFuture.cascade(to: partialStepPromise)
                    return existingIdFuture
                  }

                  return containsLoop.makeSucceededFuture(NextStep.execute(in: .UploadingFiles, run: {
                    let nextStepFuture: LLBFuture<NextStep> = self.execute(on: self.cpuQueue, default: nil) { () -> LLBFuture<NextStep>? in
                        let data: LLBFastData

                        switch importObject {
                        case let .link(target):
                            data = target
                        case let .file(file, _):
                            data = try self.prepareSingleSegment(of: file, segmentNumber: segmentOffset, useCompression: segm.isCompressed)
                        }

                        let slice = data.toByteBuffer()

                        // Make sure we want until the netQueue is sufficiently
                        // free to take our load. This ensures that we're not
                        // limited by CPU parallelism for network concurrency.
                        return self.executeWithBackpressure(on: self.netQueue, loop: containsLoop, size: slice.readableBytes, default: .skipped) { () -> LLBFuture<NextStep> in
                            return self.dbPut(refs: [], data: slice, importSize: segm.uncompressedSize, ctx).flatMap { id -> LLBFuture<NextStep> in
                                withExtendedLifetime(importObject) { // for mmap
                                    uploadFileInfo(blobId: id)
                                }
                            }.map { result in
                                return result
                            }.hop(to: loop)
                        }
                    }.flatMap {
                        // This type of return ensures that cpuQueue does not
                        // wait for the netQueue operation to complete.
                        $0 ?? loop.makeSucceededFuture(NextStep.skipped)
                    }
                    nextStepFuture.cascade(to: partialStepPromise)
                    return nextStepFuture
                  }))
                }
              }

              return (nextStepFuture: throttledContainsFuture, partialStepFuture: partialStepPromise.futureResult)
          }

          let cheapNextStepFutures: [LLBFuture<NextStep>] = cheapNextAndPartialStepFutures.map { $0.nextStepFuture }

          // Sending a single segment, for which we don't need
          // to wait until all of its subcomponents are uploaded.
          guard isSingleChunk == false else {
            return cheapNextStepFutures
          }

          let partialStepFutures = cheapNextAndPartialStepFutures.map { $0.partialStepFuture }
          let combinePartialResultsStep: NextStep = NextStep.execute(in: .UploadingWait, run: {
            return self.whenAllSucceed(partialStepFutures).flatMapErrorThrowing { error -> [NextStep] in
                // If ready any segment fails with something, we either forward
                // the error or hide it.

                if error is FileSystemError {
                    // Some kind of filesystem access error.
                    if self.options.skipUnreadable {
                        return []
                    }
                } else if let fsError = error as? FileSegmenter.Error {
                    if case .resourceChanged = fsError {
                        if self.options.relaxConsistencyChecks {
                            // Turn consistency checks errors into skips.
                            // Location /EC1.
                            return []
                        }
                    }
                } else if error is FileSystemError {
                    if self.options.skipUnreadable {
                        // Not a consistency error, hide it.
                        return []
                    }
                }

                throw error

              }.flatMap { nextSteps in
                // The next steps can only be empty if we've reacting on
                // a filesystem-related error with some of our chunks.
                guard nextSteps.isEmpty == false else {
                    return loop.makeSucceededFuture(.skipped)
                }

                let chunkIds: [LLBDataID] = nextSteps.map {
                    guard case let .partialFileChunk(id) = $0 else {
                        preconditionFailure("Next step is not a partial chunk")
                    }
                    return id
                }

                var fileInfo = LLBFileInfo()
                fileInfo.type = type
                fileInfo.size = UInt64(allSegmentsUncompressedDataSize)
                // The top is not compressed when chunks are present.
                fileInfo.compression = .none
                fileInfo.fixedChunkSize = UInt64(chunkIds.count > 1 ? self.options.fileChunkSize : allSegmentsUncompressedDataSize)
                let posixDetails = importObject.posixDetails
                fileInfo.update(posixDetails: posixDetails, options: self.options)
                do {
                    let fileInfoBytes = try fileInfo.toBytes()
                    return self.execute(on: self.netQueue, size: fileInfoBytes.readableBytes, default: .skipped) {
                        self.dbPut(refs: chunkIds, data: fileInfoBytes, importSize: nil, ctx).map { id in
                            return .singleFile(SingleFileInfo(path: path, id: id, type: type, size: UInt64(clamping: allSegmentsUncompressedDataSize), posixDetails: posixDetails))
                        }
                    }
                } catch {
                    return loop.makeFailedFuture(error)
                }
              }
            })

          // Since uploading fragmented files requires waiting and churning
          // the recursive next step machinery until the parts are properly
          // uploaded, we can't just block on waiting until all the chunks
          // have been uploaded. Therefore we wait for the huge files in
          // its own state, .UploadingWait.
          return cheapNextStepFutures + [loop.makeSucceededFuture(combinePartialResultsStep)]
        }

        // When the .EstimatingSize phase comes, this can be executed.
        return .execute(in: .EstimatingSize, run: {
            loop.makeSucceededFuture(NextStep.wait(in: .CheckIfUploaded,
                futures: assemblePartialNextSteps()))
        })
    }