in hot-reload-orchestration/src/main/kotlin/org/jetbrains/compose/reload/orchestration/OrchestrationServer.kt [70:173]
public fun OrchestrationServer(): OrchestrationServer {
val bind = Future<Unit>()
val port = Future<Int>()
val start = Future<Unit>()
val messages = Bus<OrchestrationPackage>()
val states = OrchestrationServerStates()
/**
* Represents an actor who attempts to connect to clients listening at a given port.
*/
val connectActor = Actor</* Port */ Int, Boolean>()
val task = launchTask<Unit>("OrchestrationServer") {
invokeOnFinish { bind.completeExceptionally(it.exceptionOrNull() ?: StoppedException()) }
invokeOnFinish { port.completeExceptionally(it.exceptionOrNull() ?: StoppedException()) }
val serverThread = WorkerThread("Orchestration Server")
launchOnFinish { serverThread.shutdown().await() }
subtask("Listen for incoming connections", serverThread.dispatcherImmediate) {
val serverSocket = ServerSocket()
invokeOnStop { serverSocket.close() }
bind.await()
serverSocket.bind(InetSocketAddress("127.0.0.1", 0))
port.complete(serverSocket.localPort)
start.await()
while (isActive()) {
val clientSocket = serverSocket.accept()
clientSocket.setOrchestrationDefaults()
val client = launchClient(OrchestrationIO(clientSocket), messages, states)
invokeOnFinish { client.stop() }
}
}
subtask("Connect to clients") {
val serverPort = port.awaitOrThrow()
connectActor.process { port ->
logger.debug("Connecting to client on port '$port'")
val reader = OrchestrationIO.newReaderThread()
val writer = OrchestrationIO.newWriterThread()
try {
withThread(writer) {
val socket = Socket("127.0.0.1", port)
socket.setOrchestrationDefaults()
val io = OrchestrationIO(socket, writer = writer, reader = reader)
io.writeInt(ORCHESTRATION_PROTOCOL_MAGIC_NUMBER)
io.writeInt(OrchestrationVersion.current.intValue)
io.writeInt(serverPort)
val client = launchClient(io, messages, states)
invokeOnFinish { client.stop() }
true
}
} catch (t: Throwable) {
logger.error("Failed to connect to client on port '$port'", t)
reader.close()
writer.close()
false
}
}
}
}
return object : OrchestrationServer, Task<Unit> by task {
override val messages = messages.withType<OrchestrationMessage>()
override val port: Future<Int> = port
override val states = states
override suspend fun connectClient(listenerPort: Int): Boolean {
return connectActor(listenerPort)
}
override suspend fun send(message: OrchestrationMessage) {
messages.send(message)
}
override suspend fun <T : OrchestrationState?> update(
key: OrchestrationStateKey<T>, update: (T) -> T
): Update<T> {
return states.update(key, update)
}
/* Server will always be able to update the state, as owner */
override suspend fun <T : OrchestrationState?> tryUpdate(
key: OrchestrationStateKey<T>, update: (T) -> T
): Update<T> = update(key, update)
override suspend fun bind() {
bind.complete(Unit)
port.await()
}
override suspend fun start() {
bind()
start.complete(Unit)
}
}
}