in csharp/Worker/Microsoft.Spark.CSharp/Worker.cs [124:211]
public static bool ProcessStream(Stream inputStream, Stream outputStream, int splitIndex)
{
logger.LogInfo("Start of stream processing, splitIndex: {0}", splitIndex);
bool readComplete = true; // Whether all input data from the socket is read though completely
try
{
DateTime bootTime = DateTime.UtcNow;
string ver = SerDe.ReadString(inputStream);
logger.LogDebug("version: " + ver);
//// initialize global state
//shuffle.MemoryBytesSpilled = 0
//shuffle.DiskBytesSpilled = 0
SerDe.ReadInt(inputStream);
SerDe.ReadInt(inputStream);
SerDe.ReadInt(inputStream);
SerDe.ReadLong(inputStream);
// fetch name of workdir
string sparkFilesDir = SerDe.ReadString(inputStream);
logger.LogDebug("spark_files_dir: " + sparkFilesDir);
//SparkFiles._root_directory = sparkFilesDir
//SparkFiles._is_running_on_worker = True
ProcessIncludesItems(inputStream);
ProcessBroadcastVariables(inputStream);
Accumulator.threadLocalAccumulatorRegistry = new Dictionary<int, Accumulator>();
var formatter = ProcessCommand(inputStream, outputStream, splitIndex, bootTime);
// Mark the beginning of the accumulators section of the output
SerDe.Write(outputStream, (int)SpecialLengths.END_OF_DATA_SECTION);
WriteAccumulatorValues(outputStream, formatter);
int end = SerDe.ReadInt(inputStream);
// check end of stream
if (end == (int)SpecialLengths.END_OF_STREAM)
{
SerDe.Write(outputStream, (int)SpecialLengths.END_OF_STREAM);
logger.LogDebug("END_OF_STREAM: " + (int)SpecialLengths.END_OF_STREAM);
}
else
{
// This may happen when the input data is not read completely, e.g., when take() operation is performed
logger.LogWarn("**** unexpected read: {0}, not all data is read", end);
// write a different value to tell JVM to not reuse this worker
SerDe.Write(outputStream, (int)SpecialLengths.END_OF_DATA_SECTION);
readComplete = false;
}
outputStream.Flush();
// log bytes read and write
logger.LogDebug("total read bytes: {0}", SerDe.totalReadNum);
logger.LogDebug("total write bytes: {0}", SerDe.totalWriteNum);
logger.LogDebug("Stream processing completed successfully");
}
catch (Exception e)
{
logger.LogError("ProcessStream failed with exception:");
logger.LogError(e.ToString());
try
{
logger.LogError("Trying to write error to stream");
SerDe.Write(outputStream, e.ToString());
}
catch (IOException)
{
// JVM close the socket
}
catch (Exception ex)
{
logger.LogError("Writing exception to stream failed with exception:");
logger.LogException(ex);
}
throw;
}
logger.LogInfo("Stop of stream processing, splitIndex: {0}, readComplete: {1}", splitIndex, readComplete);
return readComplete;
}