in csharp/Worker/Microsoft.Spark.CSharp/TaskRunner.cs [53:122]
public void Run()
{
Logger.LogInfo("TaskRunner [{0}] is running ...", TaskId);
try
{
while (!stop)
{
using (var inputStream = socket.GetInputStream())
using (var outputStream = socket.GetOutputStream())
{
if (!string.IsNullOrEmpty(secret))
{
SerDe.Write(outputStream, secret);
outputStream.Flush();
var reply = SerDe.ReadString(inputStream);
Logger.LogDebug("Connect back to JVM: " + reply);
secret = null;
}
byte[] bytes = SerDe.ReadBytes(inputStream, sizeof(int));
if (bytes != null)
{
int splitIndex = SerDe.ToInt(bytes);
bool readComplete = Worker.ProcessStream(inputStream, outputStream, splitIndex);
outputStream.Flush();
if (!readComplete) // if the socket is not read through completely, then it can't be reused
{
stop = true;
// wait for server to complete, otherwise server may get 'connection reset' exception
Logger.LogInfo("Sleep 500 millisecond to close socket ...");
Thread.Sleep(500);
}
else if (!socketReuse)
{
stop = true;
// wait for server to complete, otherwise server gets 'connection reset' exception
// Use SerDe.ReadBytes() to detect java side has closed socket properly
// ReadBytes() will block until the socket is closed
Logger.LogInfo("waiting JVM side to close socket...");
SerDe.ReadBytes(inputStream);
Logger.LogInfo("JVM side has closed socket");
}
}
else
{
stop = true;
Logger.LogWarn("read null splitIndex, socket is closed by JVM");
}
}
}
}
catch (Exception e)
{
stop = true;
Logger.LogError("TaskRunner [{0}] exeption, will dispose this TaskRunner", TaskId);
Logger.LogException(e);
}
finally
{
try
{
socket.Close();
}
catch (Exception ex)
{
Logger.LogWarn("close socket exception: {0}", ex);
}
Logger.LogInfo("TaskRunner [{0}] finished", TaskId);
}
}