public static bool ProcessStream()

in csharp/Worker/Microsoft.Spark.CSharp/Worker.cs [124:211]


        public static bool ProcessStream(Stream inputStream, Stream outputStream, int splitIndex)
        {
            logger.LogInfo("Start of stream processing, splitIndex: {0}", splitIndex);
            bool readComplete = true;   // Whether all input data from the socket is read though completely

            try
            {
                DateTime bootTime = DateTime.UtcNow;

                string ver = SerDe.ReadString(inputStream);
                logger.LogDebug("version: " + ver);

                //// initialize global state
                //shuffle.MemoryBytesSpilled = 0
                //shuffle.DiskBytesSpilled = 0
	            SerDe.ReadInt(inputStream);
				SerDe.ReadInt(inputStream);
				SerDe.ReadInt(inputStream);
				SerDe.ReadLong(inputStream);

				// fetch name of workdir
				string sparkFilesDir = SerDe.ReadString(inputStream);
                logger.LogDebug("spark_files_dir: " + sparkFilesDir);
                //SparkFiles._root_directory = sparkFilesDir
                //SparkFiles._is_running_on_worker = True

                ProcessIncludesItems(inputStream);

                ProcessBroadcastVariables(inputStream);

	            Accumulator.threadLocalAccumulatorRegistry = new Dictionary<int, Accumulator>();

                var formatter = ProcessCommand(inputStream, outputStream, splitIndex, bootTime);

                // Mark the beginning of the accumulators section of the output
                SerDe.Write(outputStream, (int)SpecialLengths.END_OF_DATA_SECTION);

                WriteAccumulatorValues(outputStream, formatter);

                int end = SerDe.ReadInt(inputStream);

                // check end of stream
                if (end == (int)SpecialLengths.END_OF_STREAM)
                {
                    SerDe.Write(outputStream, (int)SpecialLengths.END_OF_STREAM);
                    logger.LogDebug("END_OF_STREAM: " + (int)SpecialLengths.END_OF_STREAM);
                }
                else
                {
                    // This may happen when the input data is not read completely, e.g., when take() operation is performed
                    logger.LogWarn("**** unexpected read: {0}, not all data is read", end);
                    // write a different value to tell JVM to not reuse this worker
                    SerDe.Write(outputStream, (int)SpecialLengths.END_OF_DATA_SECTION);
                    readComplete = false;
                }

                outputStream.Flush();

                // log bytes read and write
                logger.LogDebug("total read bytes: {0}", SerDe.totalReadNum);
                logger.LogDebug("total write bytes: {0}", SerDe.totalWriteNum);

                logger.LogDebug("Stream processing completed successfully");
            }
            catch (Exception e)
            {
                logger.LogError("ProcessStream failed with exception:");
                logger.LogError(e.ToString());
                try
                {
                    logger.LogError("Trying to write error to stream");
                    SerDe.Write(outputStream, e.ToString());
                }
                catch (IOException)
                {
                    // JVM close the socket
                }
                catch (Exception ex)
                {
                    logger.LogError("Writing exception to stream failed with exception:");
                    logger.LogException(ex);
                }
                throw;
            }

            logger.LogInfo("Stop of stream processing, splitIndex: {0}, readComplete: {1}", splitIndex, readComplete);
            return readComplete;
        }