in src/common/Kubernetes/KubectlImpl.cs [68:219]
public int RunShortRunningCommand(
KubernetesCommandName commandName,
string command,
Action<string> onStdOut,
Action<string> onStdErr,
CancellationToken cancellationToken,
Dictionary<string, string> envVariables = null,
bool log137ExitCodeErrorAsWarning = false,
int timeoutMs = 30000)
{
_log.Info("Invoking kubectl {0} command: {1}", commandName.ToString(), new PII(command));
Stopwatch w = new Stopwatch();
var stdOutBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var stdErrBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var kubectlProcess = _platform.CreateProcess(
new ProcessStartInfo()
{
FileName = GetKubectlFilepath(),
Arguments = command,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true
});
if (envVariables != null)
{
foreach (KeyValuePair<string, string> env in envVariables)
{
kubectlProcess.StartInfo.EnvironmentVariables[env.Key] = env.Value;
}
}
try
{
Action killKubectlProcess =
() =>
{
try
{
if (!kubectlProcess.HasExited)
{
kubectlProcess.Kill();
kubectlProcess.Dispose();
}
}
catch (Exception e)
{
_log.Exception(e);
}
};
// Creates a waiter that ensures all process output has drained before returning.
using (cancellationToken.Register(killKubectlProcess))
using (var outputWaitHandle = new AutoResetEvent(false))
{
DataReceivedEventHandler outputDataReceivedHandler = (sender, e) =>
{
// The last call will have e.Data as null, so we wait for that condition
// to be true to close the handle.
if (e.Data == null)
{
try
{
outputWaitHandle.Set();
}
catch (ObjectDisposedException)
{
// this could be invoked outside of this 'using' statement during timeout
}
}
else
{
stdOutBuilder.AppendLine(e.Data);
onStdOut?.Invoke(e.Data);
}
};
DataReceivedEventHandler errorDataReceivedHandler = (sender, e) =>
{
if (!string.IsNullOrEmpty(e.Data))
{
stdErrBuilder.AppendLine(e.Data);
onStdErr?.Invoke(e.Data);
}
};
try
{
kubectlProcess.OutputDataReceived += outputDataReceivedHandler;
kubectlProcess.ErrorDataReceived += errorDataReceivedHandler;
w.Start();
kubectlProcess.Start();
kubectlProcess.BeginOutputReadLine();
kubectlProcess.BeginErrorReadLine();
if (kubectlProcess.WaitForExit(timeoutMs) && outputWaitHandle.WaitOne(timeoutMs))
{
w.Stop();
_log.Info("Invoked kubectl {0} command: '{1}' exited with {2} in {3}ms", commandName.ToString(), new PII(command), kubectlProcess.ExitCode, w.ElapsedMilliseconds);
if (kubectlProcess.ExitCode != 0)
{
LogFailure(commandName, command, kubectlProcess.ExitCode, stdOutBuilder, stdErrBuilder, log137ExitCodeErrorAsWarning);
}
return kubectlProcess.ExitCode;
}
else
{
var messageFormat = "'kubectl' timeout after {0}ms when running {1}";
_log.Info(messageFormat, timeoutMs, commandName.ToString());
throw new TimeoutException(string.Format(messageFormat, timeoutMs, command));
}
}
finally
{
kubectlProcess.OutputDataReceived -= outputDataReceivedHandler;
kubectlProcess.ErrorDataReceived -= errorDataReceivedHandler;
}
}
}
catch (Exception e)
{
if (e is KubectlException)
{
_log.ExceptionAsWarning(e);
}
else
{
_log.Exception(e);
}
_log.Error("kubectl {0} command: '{1}' failed with exception {2}", commandName.ToString(), new PII(command), e.Message);
// Killing and disposing failed process
if (!kubectlProcess.HasExited)
{
// Try and catch exceptions in case of race conditions or other unexpected errors killing the process
try
{
kubectlProcess.Kill();
}
catch (Exception kubeEx)
{
_log.Warning("Failed to kill kubectl {0}", kubeEx.Message);
}
}
throw;
}
finally
{
kubectlProcess.Dispose();
}
}