in Services/DataX.Flow/DataX.Flow.InteractiveQuery/HDInsight/HDInsightKernel.cs [230:307]
public string ExecuteCode(string code, out string error, int timeout = _ExecuteCodeTimeout, bool waitForOutput = true)
{
var content = new Dictionary<string, object>() {
{"code", code},
{"silent", false },
{"store_history", true },
{"user_expressions", new Dictionary<string, object>()},
{"allow_stdin", false}
};
List<string> output = new List<string>();
string errorMsg = null;
using (AutoResetEvent are = new AutoResetEvent(false))
{
bool receivedReply = false;
bool receivedResult = false;
SendCommand(
"execute_request",
content,
response => {
string msgType = (response["header"] as JObject)["msg_type"].ToString();
Debug.WriteLine(msgType);
Dictionary<string, object> responseContent = (response["content"] as JObject).ToObject<Dictionary<string, object>>();
switch (msgType)
{
case "status":
break;
case "display_data":
Console.WriteLine("Display data");
SaveContentOutput(response, output);
break;
case "error":
errorMsg = (string)responseContent["evalue"];
break;
case "execute_result":
SaveContentOutput(response, output);
receivedResult = true;
break;
case "execute_reply":
if ((string)responseContent["status"] == "error")
{
errorMsg = (string)responseContent["evalue"];
}
receivedReply = true;
break;
case "stream":
output.Add((response["content"] as JObject).ToObject<Dictionary<string, object>>()["text"].ToString());
break;
}
if (receivedReply)
{
if (output.Count > 0 ||
!waitForOutput ||
receivedResult ||
!string.IsNullOrEmpty(errorMsg))
{
try
{
are.Set();
}
catch (ObjectDisposedException)
{
}
}
}
}
);
if (!are.WaitOne(timeout))
{
throw new InvalidOperationException($"No response to execution within {timeout} ms.");
}
error = errorMsg;
return string.Join("", output);
}
}