in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [82:244]
private def nioEventLoop(sel: Selector, log: LoggingAdapter)(implicit ec: ExecutionContext): Unit =
while (sel.isOpen) {
val nrOfKeysSelected = sel.select()
if (sel.isOpen) {
val keySelectable = nrOfKeysSelected > 0
val keys = if (keySelectable) sel.selectedKeys().iterator() else sel.keys().iterator()
while (keys.hasNext) {
val key = keys.next()
if (key != null) { // Observed as sometimes being null via sel.keys().iterator()
if (log.isDebugEnabled) {
val interestInfo = if (keySelectable) {
val interestSet = key.asInstanceOf[SelectionKey].interestOps()
val isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) != 0
val isInterestedInConnect = (interestSet & SelectionKey.OP_CONNECT) != 0
val isInterestedInRead = (interestSet & SelectionKey.OP_READ) != 0
val isInterestedInWrite = (interestSet & SelectionKey.OP_WRITE) != 0
f"(accept=$isInterestedInAccept%5s connect=$isInterestedInConnect%5s read=$isInterestedInRead%5s write=$isInterestedInWrite%5s)"
} else {
""
}
log.debug(
f"""ch=${key.channel().hashCode()}%10d
| at=${Option(key.attachment()).fold(0)(_.hashCode())}%10d
| selectable=$keySelectable%5s
| acceptable=${key.isAcceptable}%5s
| connectable=${key.isConnectable}%5s
| readable=${key.isReadable}%5s
| writable=${key.isWritable}%5s
| $interestInfo""".stripMargin.replaceAll("\n", ""))
}
if (keySelectable && (key.isAcceptable || key.isConnectable)) {
val newConnectionOp = key.attachment().asInstanceOf[(Selector, SelectionKey) => Unit]
newConnectionOp(sel, key)
}
key.attachment match {
case null =>
case sendReceiveContext: SendReceiveContext =>
sendReceiveContext.send match {
case SendRequested(buffer, sent) if keySelectable && key.isWritable =>
val channel = key.channel().asInstanceOf[UnixSocketChannel]
val written =
try {
channel.write(buffer)
} catch {
case e: IOException =>
key.cancel()
try {
key.channel.close()
} catch { case _: IOException => }
sent.failure(e)
-1
}
val remaining = buffer.remaining
log.debug("written: {} remaining: {}", written, remaining)
if (written >= 0 && remaining == 0) {
sendReceiveContext.send = SendAvailable(buffer)
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE)
sent.success(Done)
}
case _: SendRequested =>
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE)
case _: SendAvailable =>
case ShutdownRequested if key.isValid && !sendReceiveContext.isOutputShutdown =>
try {
if (sendReceiveContext.isInputShutdown) {
log.debug("Write-side is shutting down")
key.cancel()
key.channel.close()
} else {
log.debug("Write-side is shutting down further output")
sendReceiveContext.isOutputShutdown = true
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE)
key.channel.asInstanceOf[UnixSocketChannel].shutdownOutput()
}
} catch {
// socket could have been closed in the meantime, so shutdownOutput will throw this
case _: IOException =>
}
case ShutdownRequested =>
case CloseRequested =>
log.debug("Write-side is shutting down unconditionally")
key.cancel()
try {
key.channel.close()
} catch { case _: IOException => }
}
sendReceiveContext.receive match {
case ReceiveAvailable(queue, buffer) if keySelectable && key.isReadable =>
buffer.clear()
val channel = key.channel.asInstanceOf[UnixSocketChannel]
val read =
try {
channel.read(buffer)
} catch {
// socket could have been closed in the meantime, so read will throw this
case _: IOException => -1
}
log.debug("read: {}", read)
if (read >= 0) {
buffer.flip()
val pendingResult = queue.offer(ByteString(buffer))
pendingResult.onComplete(_ => sel.wakeup())
sendReceiveContext.receive = PendingReceiveAck(queue, buffer, pendingResult)
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ)
} else {
queue.complete()
try {
if (!sendReceiveContext.halfClose || sendReceiveContext.isOutputShutdown) {
queue.watchCompletion().onComplete { _ =>
log.debug("Read-side is shutting down")
key.cancel()
try {
key.channel().close()
} catch { case _: IOException => }
}
} else {
log.debug("Read-side is shutting down further input")
sendReceiveContext.isInputShutdown = true
channel.shutdownInput()
}
} catch {
// socket could have been closed in the meantime, so shutdownInput will throw this
case _: IOException =>
}
}
case _: ReceiveAvailable =>
case PendingReceiveAck(receiveQueue, receiveBuffer, pendingResult) if pendingResult.isCompleted =>
pendingResult.value.get match {
case Success(QueueOfferResult.Enqueued) =>
key.interestOps(key.interestOps() | SelectionKey.OP_READ)
sendReceiveContext.receive = ReceiveAvailable(receiveQueue, receiveBuffer)
case e =>
log.debug("Read-side is shutting down due to {}", e)
receiveQueue.complete()
key.cancel()
try {
key.channel.close()
} catch { case _: IOException => }
}
case _: PendingReceiveAck =>
}
case _: ((Selector, SelectionKey) => Unit) @unchecked =>
case other =>
log.warning("unexpected receive: [{}]", other)
}
}
if (keySelectable) keys.remove()
}
}
}