in Sources/NIOPosix/BaseSocketChannel.swift [486:544]
func flushNow() -> IONotificationState {
self.eventLoop.assertInEventLoop()
// Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by
// `writeToSocket`.
guard !self.inFlushNow else {
return .unregister
}
assert(!self.inFlushNow)
self.inFlushNow = true
defer {
self.inFlushNow = false
}
var newWriteRegistrationState: IONotificationState = .unregister
do {
while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen {
assert(self.lifecycleManager.isActive)
let writeResult = try self.writeToSocket()
switch writeResult.writeResult {
case .couldNotWriteEverything:
newWriteRegistrationState = .register
case .writtenCompletely:
newWriteRegistrationState = .unregister
}
if writeResult.writabilityChange {
// We went from not writable to writable.
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
}
} catch let err {
// If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
// We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
if self.readIfNeeded0() {
assert(self.lifecycleManager.isActive)
// We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it.
var readAtLeastOnce = false
while let read = try? self.readFromSocket(), read == .some {
readAtLeastOnce = true
}
if readAtLeastOnce && self.lifecycleManager.isActive {
self.pipeline.fireChannelReadComplete()
}
}
self.close0(error: err, mode: .all, promise: nil)
// we handled all writes
return .unregister
}
assert((newWriteRegistrationState == .register && self.hasFlushedPendingWrites()) ||
(newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()),
"illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())")
return newWriteRegistrationState
}