private void ProcessCallbackRequest()

in csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs [288:372]


        private void ProcessCallbackRequest(object socket)
        {
            logger.LogDebug("New thread (id={0}) created to process callback request", Thread.CurrentThread.ManagedThreadId);

            try
            {
                using (var sock = (ISocketWrapper)socket)
                using (var s = sock.GetStream())
                {
                    while (true)
                    {
                        try
                        {
                            string cmd = SerDe.ReadString(s);
                            if (cmd == "close")
                            {
                                logger.LogDebug("receive close cmd from Scala side");
                                break;
                            }

                            if (cmd == "callback")
                            {
                                int numRDDs = SerDe.ReadInt(s);
                                var jrdds = new List<JvmObjectReference>();
                                for (int i = 0; i < numRDDs; i++)
                                {
                                    jrdds.Add(new JvmObjectReference(SerDe.ReadObjectId(s)));
                                }
                                double time = SerDe.ReadDouble(s);

                                IFormatter formatter = new BinaryFormatter();
                                object func = formatter.Deserialize(new MemoryStream(SerDe.ReadBytes(s)));

                                string serializedMode = SerDe.ReadString(s);
                                RDD<dynamic> rdd = null;
                                if (jrdds[0].Id != null)
                                    rdd = new RDD<dynamic>(new RDDIpcProxy(jrdds[0]), sparkContext, (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode));

                                if (func is Func<double, RDD<dynamic>, RDD<dynamic>>)
                                {
                                    JvmObjectReference jrdd = ((((Func<double, RDD<dynamic>, RDD<dynamic>>)func)(time, rdd)).RddProxy as RDDIpcProxy).JvmRddReference;
                                    SerDe.Write(s, (byte)'j');
                                    SerDe.Write(s, jrdd.Id);
                                }
                                else if (func is Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)
                                {
                                    string serializedMode2 = SerDe.ReadString(s);
                                    RDD<dynamic> rdd2 = new RDD<dynamic>(new RDDIpcProxy(jrdds[1]), sparkContext, (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode2));
                                    JvmObjectReference jrdd = ((((Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)func)(time, rdd, rdd2)).RddProxy as RDDIpcProxy).JvmRddReference;
                                    SerDe.Write(s, (byte)'j');
                                    SerDe.Write(s, jrdd.Id);
                                }
                                else
                                {
                                    ((Action<double, RDD<dynamic>>)func)(time, rdd);
                                    SerDe.Write(s, (byte)'n');
                                }
                                s.Flush();
                            }
                        }
                        catch (Exception e)
                        {
                            //log exception only when callback socket is not shutdown explicitly
                            if (!callbackSocketShutdown)
                            {
                                logger.LogError("Exception processing call back request. Thread id {0}", Thread.CurrentThread.ManagedThreadId);
                                logger.LogException(e);

                                // exit when exception happens
                                logger.LogError("ProcessCallbackRequest fail, will exit ...");
                                Thread.Sleep(1000);
                                System.Environment.Exit(1);
                            }
                        }
                    }
                }
            }
            catch (Exception e)
            {
                logger.LogError("Exception in callback. Thread id {0}", Thread.CurrentThread.ManagedThreadId);
                logger.LogException(e);
            }

            logger.LogDebug("Thread (id={0}) to process callback request exiting", Thread.CurrentThread.ManagedThreadId);
        }