in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [392:461]
protected def bind(path: Path,
backlog: Int = 128,
halfClose: Boolean = false): Source[IncomingConnection, Future[ServerBinding]] = {
val bind: () => Source[IncomingConnection, Future[ServerBinding]] = { () =>
val (incomingConnectionQueue, incomingConnectionSource) =
Source
.queue[IncomingConnection](2, OverflowStrategy.backpressure)
.prefixAndTail(0)
.map {
case (_, source) =>
source
.watchTermination() { (mat, done) =>
done
.andThen {
case _ =>
try {
Files.delete(path)
} catch {
case NonFatal(_) =>
}
}
mat
}
}
.toMat(Sink.head)(Keep.both)
.run()
val serverBinding = Promise[ServerBinding]()
val channel = UnixServerSocketChannel.open()
channel.configureBlocking(false)
val address = new JnrUnixSocketAddress(path.toFile)
val registeredKey =
channel.register(sel,
SelectionKey.OP_ACCEPT,
acceptKey(address, incomingConnectionQueue, halfClose, receiveBufferSize, sendBufferSize) _)
try {
channel.socket().bind(address, backlog)
sel.wakeup()
serverBinding.success(
ServerBinding(UnixSocketAddress(Paths.get(address.path))) { () =>
registeredKey.cancel()
channel.close()
incomingConnectionQueue.complete()
incomingConnectionQueue.watchCompletion().map(_ => ())
})
} catch {
case e: IOException =>
val withAddress = new IOException(e.getMessage + s" ($address)", e)
registeredKey.cancel()
channel.close()
incomingConnectionQueue.fail(withAddress)
serverBinding.failure(withAddress)
case NonFatal(e) =>
registeredKey.cancel()
channel.close()
incomingConnectionQueue.fail(e)
serverBinding.failure(e)
}
Source
.futureSource(incomingConnectionSource)
.mapMaterializedValue(_ => serverBinding.future)
}
Source.lazySource(bind).mapMaterializedValue(_.flatMap(identity))
}