in csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs [288:372]
private void ProcessCallbackRequest(object socket)
{
logger.LogDebug("New thread (id={0}) created to process callback request", Thread.CurrentThread.ManagedThreadId);
try
{
using (var sock = (ISocketWrapper)socket)
using (var s = sock.GetStream())
{
while (true)
{
try
{
string cmd = SerDe.ReadString(s);
if (cmd == "close")
{
logger.LogDebug("receive close cmd from Scala side");
break;
}
if (cmd == "callback")
{
int numRDDs = SerDe.ReadInt(s);
var jrdds = new List<JvmObjectReference>();
for (int i = 0; i < numRDDs; i++)
{
jrdds.Add(new JvmObjectReference(SerDe.ReadObjectId(s)));
}
double time = SerDe.ReadDouble(s);
IFormatter formatter = new BinaryFormatter();
object func = formatter.Deserialize(new MemoryStream(SerDe.ReadBytes(s)));
string serializedMode = SerDe.ReadString(s);
RDD<dynamic> rdd = null;
if (jrdds[0].Id != null)
rdd = new RDD<dynamic>(new RDDIpcProxy(jrdds[0]), sparkContext, (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode));
if (func is Func<double, RDD<dynamic>, RDD<dynamic>>)
{
JvmObjectReference jrdd = ((((Func<double, RDD<dynamic>, RDD<dynamic>>)func)(time, rdd)).RddProxy as RDDIpcProxy).JvmRddReference;
SerDe.Write(s, (byte)'j');
SerDe.Write(s, jrdd.Id);
}
else if (func is Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)
{
string serializedMode2 = SerDe.ReadString(s);
RDD<dynamic> rdd2 = new RDD<dynamic>(new RDDIpcProxy(jrdds[1]), sparkContext, (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode2));
JvmObjectReference jrdd = ((((Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)func)(time, rdd, rdd2)).RddProxy as RDDIpcProxy).JvmRddReference;
SerDe.Write(s, (byte)'j');
SerDe.Write(s, jrdd.Id);
}
else
{
((Action<double, RDD<dynamic>>)func)(time, rdd);
SerDe.Write(s, (byte)'n');
}
s.Flush();
}
}
catch (Exception e)
{
//log exception only when callback socket is not shutdown explicitly
if (!callbackSocketShutdown)
{
logger.LogError("Exception processing call back request. Thread id {0}", Thread.CurrentThread.ManagedThreadId);
logger.LogException(e);
// exit when exception happens
logger.LogError("ProcessCallbackRequest fail, will exit ...");
Thread.Sleep(1000);
System.Environment.Exit(1);
}
}
}
}
}
catch (Exception e)
{
logger.LogError("Exception in callback. Thread id {0}", Thread.CurrentThread.ManagedThreadId);
logger.LogException(e);
}
logger.LogDebug("Thread (id={0}) to process callback request exiting", Thread.CurrentThread.ManagedThreadId);
}