in scala/src/main/org/apache/spark/api/csharp/CSharpBackendHandler.scala [40:110]
def handleBackendRequest(msg: Array[Byte]): Array[Byte] = {
val bis = new ByteArrayInputStream(msg)
val dis = new DataInputStream(bis)
val bos = new ByteArrayOutputStream()
val dos = new DataOutputStream(bos)
// First bit is isStatic
val isStatic = readBoolean(dis)
val objId = readString(dis)
val methodName = readString(dis)
val numArgs = readInt(dis)
if (objId == "SparkCLRHandler") {
methodName match {
case "stopBackend" =>
writeInt(dos, 0)
writeType(dos, "void")
server.close()
case "rm" =>
try {
val t = readObjectType(dis)
assert(t == 'c')
val objToRemove = readString(dis)
JVMObjectTracker.remove(objToRemove)
writeInt(dos, 0)
writeObject(dos, null)
} catch {
case e: Exception =>
logError(s"Removing $objId failed", e)
writeInt(dos, -1)
}
case "connectCallback" =>
val t = readObjectType(dis)
assert(t == 'i')
val port = readInt(dis)
logInfo(s"Connecting to a callback server at port $port")
CSharpBackend.callbackPort = port
writeInt(dos, 0)
writeType(dos, "void")
case "closeCallback" =>
// Send close to CSharp callback server.
logInfo("Requesting to close all call back sockets.")
var socket: Socket = null
do {
socket = CSharpBackend.callbackSockets.poll()
if (socket != null) {
val dataOutputStream = new DataOutputStream(socket.getOutputStream)
SerDe.writeString(dataOutputStream, "close")
try {
socket.close()
socket = null
}
catch {
case e: Exception => logError("Exception when closing socket: ", e)
}
}
} while (socket != null)
CSharpBackend.callbackSocketShutdown = true
writeInt(dos, 0)
writeType(dos, "void")
case _ => dos.writeInt(-1)
}
} else {
handleMethodCall(isStatic, objId, methodName, numArgs, dis, dos)
}
bos.toByteArray
}