src/common/Kubernetes/KubectlImpl.cs (390 lines of code) (raw):
// --------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// --------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Microsoft.BridgeToKubernetes.Common.Exceptions;
using Microsoft.BridgeToKubernetes.Common.IO;
using Microsoft.BridgeToKubernetes.Common.Logging;
using Microsoft.BridgeToKubernetes.Common.Utilities;
namespace Microsoft.BridgeToKubernetes.Common.Kubernetes
{
internal class KubectlImpl : IKubectlImpl
{
private const int MaxBufferSize = 1000;
protected readonly ILog _log;
protected readonly IOperationContext _operationContext;
protected readonly IFileSystem _fileSystem;
private readonly IPlatform _platform;
protected readonly IEnvironmentVariables _environmentVariables;
private readonly string _kubectlFilePath;
private static class ExecutableLocation
{
public static string ParentDirectory = "kubectl";
public static class Windows
{
public static string Name = "kubectl.exe";
public static string Directory = "win";
}
public static class OSX
{
public static string Name = "kubectl";
public static string Directory = "osx";
}
public static class Linux
{
public static string Name = "kubectl";
public static string Directory = "linux";
}
}
public KubectlImpl(
IFileSystem fileSystem,
IPlatform platform,
IEnvironmentVariables environmentVariables,
ILog log,
string kubectlFilePath = null)
{
_log = log ?? throw new ArgumentNullException(nameof(log));
this._fileSystem = fileSystem ?? throw new ArgumentNullException(nameof(fileSystem));
this._platform = platform ?? throw new ArgumentNullException(nameof(platform));
this._environmentVariables = environmentVariables ?? throw new ArgumentNullException(nameof(environmentVariables));
this._kubectlFilePath = kubectlFilePath;
}
/// <summary>
/// <see cref="IKubectlImpl.RunShortRunningCommand"/>
/// </summary>
public int RunShortRunningCommand(
KubernetesCommandName commandName,
string command,
Action<string> onStdOut,
Action<string> onStdErr,
CancellationToken cancellationToken,
Dictionary<string, string> envVariables = null,
bool log137ExitCodeErrorAsWarning = false,
int timeoutMs = 30000)
{
_log.Info("Invoking kubectl {0} command: {1}", commandName.ToString(), new PII(command));
Stopwatch w = new Stopwatch();
var stdOutBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var stdErrBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var kubectlProcess = _platform.CreateProcess(
new ProcessStartInfo()
{
FileName = GetKubectlFilepath(),
Arguments = command,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true
});
if (envVariables != null)
{
foreach (KeyValuePair<string, string> env in envVariables)
{
kubectlProcess.StartInfo.EnvironmentVariables[env.Key] = env.Value;
}
}
try
{
Action killKubectlProcess =
() =>
{
try
{
if (!kubectlProcess.HasExited)
{
kubectlProcess.Kill();
kubectlProcess.Dispose();
}
}
catch (Exception e)
{
_log.Exception(e);
}
};
// Creates a waiter that ensures all process output has drained before returning.
using (cancellationToken.Register(killKubectlProcess))
using (var outputWaitHandle = new AutoResetEvent(false))
{
DataReceivedEventHandler outputDataReceivedHandler = (sender, e) =>
{
// The last call will have e.Data as null, so we wait for that condition
// to be true to close the handle.
if (e.Data == null)
{
try
{
outputWaitHandle.Set();
}
catch (ObjectDisposedException)
{
// this could be invoked outside of this 'using' statement during timeout
}
}
else
{
stdOutBuilder.AppendLine(e.Data);
onStdOut?.Invoke(e.Data);
}
};
DataReceivedEventHandler errorDataReceivedHandler = (sender, e) =>
{
if (!string.IsNullOrEmpty(e.Data))
{
stdErrBuilder.AppendLine(e.Data);
onStdErr?.Invoke(e.Data);
}
};
try
{
kubectlProcess.OutputDataReceived += outputDataReceivedHandler;
kubectlProcess.ErrorDataReceived += errorDataReceivedHandler;
w.Start();
kubectlProcess.Start();
kubectlProcess.BeginOutputReadLine();
kubectlProcess.BeginErrorReadLine();
if (kubectlProcess.WaitForExit(timeoutMs) && outputWaitHandle.WaitOne(timeoutMs))
{
w.Stop();
_log.Info("Invoked kubectl {0} command: '{1}' exited with {2} in {3}ms", commandName.ToString(), new PII(command), kubectlProcess.ExitCode, w.ElapsedMilliseconds);
if (kubectlProcess.ExitCode != 0)
{
LogFailure(commandName, command, kubectlProcess.ExitCode, stdOutBuilder, stdErrBuilder, log137ExitCodeErrorAsWarning);
}
return kubectlProcess.ExitCode;
}
else
{
var messageFormat = "'kubectl' timeout after {0}ms when running {1}";
_log.Info(messageFormat, timeoutMs, commandName.ToString());
throw new TimeoutException(string.Format(messageFormat, timeoutMs, command));
}
}
finally
{
kubectlProcess.OutputDataReceived -= outputDataReceivedHandler;
kubectlProcess.ErrorDataReceived -= errorDataReceivedHandler;
}
}
}
catch (Exception e)
{
if (e is KubectlException)
{
_log.ExceptionAsWarning(e);
}
else
{
_log.Exception(e);
}
_log.Error("kubectl {0} command: '{1}' failed with exception {2}", commandName.ToString(), new PII(command), e.Message);
// Killing and disposing failed process
if (!kubectlProcess.HasExited)
{
// Try and catch exceptions in case of race conditions or other unexpected errors killing the process
try
{
kubectlProcess.Kill();
}
catch (Exception kubeEx)
{
_log.Warning("Failed to kill kubectl {0}", kubeEx.Message);
}
}
throw;
}
finally
{
kubectlProcess.Dispose();
}
}
/// <summary>
/// <see cref="IKubectlImpl.RunLongRunningCommand"/>
/// </summary>
public int RunLongRunningCommand(
KubernetesCommandName commandName,
string command,
Action<string> onStdOut,
Action<string> onStdErr,
CancellationToken cancellationToken,
Dictionary<string, string> envVariables = null,
bool log137ExitCodeErrorAsWarning = false)
{
Debug.Assert(cancellationToken != default(CancellationToken), "CancellationToken cannot be passed as default for long running operations");
_log.Info("Invoking kubectl {0} command: {1}", commandName.ToString(), new PII(command));
Stopwatch w = new Stopwatch();
var stdOutBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var stdErrBuilder = new FixedSizeStringBuilder(MaxBufferSize);
var kubectlProcess = _platform.CreateProcess(
new ProcessStartInfo()
{
FileName = GetKubectlFilepath(),
Arguments = command,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true
});
if (envVariables != null)
{
foreach (KeyValuePair<string, string> env in envVariables)
{
kubectlProcess.StartInfo.EnvironmentVariables[env.Key] = env.Value;
}
}
try
{
Action killKubectlProcess =
() =>
{
try
{
if (!kubectlProcess.HasExited)
{
kubectlProcess.Kill();
kubectlProcess.Dispose();
}
}
catch (Exception e)
{
_log.Exception(e);
}
};
kubectlProcess.OutputDataReceived += (sender, e) =>
{
if (!string.IsNullOrEmpty(e.Data))
{
stdOutBuilder.AppendLine(e.Data);
onStdOut?.Invoke(e.Data);
}
};
kubectlProcess.ErrorDataReceived += (sender, e) =>
{
if (!string.IsNullOrEmpty(e.Data))
{
stdErrBuilder.AppendLine(e.Data);
onStdErr?.Invoke(e.Data);
}
};
w.Start();
kubectlProcess.Start();
using (cancellationToken.Register(killKubectlProcess))
{
kubectlProcess.BeginOutputReadLine();
kubectlProcess.BeginErrorReadLine();
_log.Info("Invoked long running kubectl {0} command: '{1}'", commandName.ToString(), new PII(command));
if (stdErrBuilder.Length > 0)
{
LogFailure(commandName, command, -1, stdOutBuilder, stdErrBuilder, log137ExitCodeErrorAsWarning);
}
kubectlProcess.WaitForExit();
}
return 0;
}
catch (Exception e)
{
if (e is KubectlException)
{
_log.ExceptionAsWarning(e);
}
else
{
_log.Exception(e);
}
_log.Error("kubectl {0} command: '{1}' failed with exception {2}", commandName.ToString(), new PII(command), e.Message);
// Killing and disposing failed process
if (!kubectlProcess.HasExited)
{
// Try and catch exceptions in case of race conditions or other unexpected errors killing the process
try
{
kubectlProcess.Kill();
}
catch (Exception kubeEx)
{
_log.Warning("Failed to kill kubectl {0}", kubeEx.Message);
}
}
throw;
}
finally
{
kubectlProcess.Dispose();
}
}
private void LogFailure(KubernetesCommandName commandName, string command, int exitCode, FixedSizeStringBuilder stdOutBuilder, FixedSizeStringBuilder stdErrBuilder, bool logErrorAsWarning = false)
{
string errorMessageFormat = "kubectl {0} command: '{1}' terminated with exit code '{2}'";
var errorMessageParameters = new List<object>
{
commandName.ToString(),
new PII(command),
exitCode
};
LogIfMaxBufferLengthReached(stdOutBuilder, nameof(stdOutBuilder), commandName);
LogIfMaxBufferLengthReached(stdErrBuilder, nameof(stdErrBuilder), commandName);
var stdOut = stdOutBuilder.ToString();
var stdErr = stdErrBuilder.ToString();
if (!string.IsNullOrWhiteSpace(stdOut))
{
if (!string.IsNullOrWhiteSpace(stdErr))
{
errorMessageFormat += " and stdout '{3}' and stderr '{4}'.";
errorMessageParameters.Add(new PII(stdOut));
// We don't log stdErr as PII because we need it for diagnostic purposes
errorMessageParameters.Add(stdErr);
}
else
{
errorMessageFormat += " and stdout '{3}'.";
errorMessageParameters.Add(new PII(stdOut));
}
}
else
{
if (!string.IsNullOrWhiteSpace(stdErr))
{
errorMessageFormat += " and stderr '{3}'.";
// We don't log stdErr as PII because we need it for diagnostic purposes
errorMessageParameters.Add(stdErr);
}
else
{
errorMessageFormat += ".";
}
}
if (logErrorAsWarning)
{
_log.Warning(errorMessageFormat, errorMessageParameters.ToArray());
}
else
{
_log.Error(errorMessageFormat, errorMessageParameters.ToArray());
}
}
public void LogIfMaxBufferLengthReached(FixedSizeStringBuilder builder, string builderName, KubernetesCommandName commandName)
{
if (builder.MaxLengthReached)
{
_log.Info($"Max buffer size reached in '{builderName}' ({builder.MaxLength} characters) while running a '{commandName.ToString()}' command");
}
}
/// <summary>
/// Builds a path to the kubectl executable, assuming kubectl was downloaded as part of the build pipeline / as part of dsc.csproj for local build.
/// The relative path kubectl is installed to: "./kubectl/[win|osx|linux]"
/// </summary>
/// <returns>Absolute path to a kubectl executable for the current OS</returns>
/// <exception cref="KubectlException">If the kubectl path can't be determined because the current OS isn't recognized</exception>
private string GetKubectlFilepath()
{
if (!string.IsNullOrEmpty(_kubectlFilePath))
{
return _kubectlFilePath;
}
string directoryName;
string executableName;
if (this._platform.IsWindows)
{
directoryName = ExecutableLocation.Windows.Directory;
executableName = ExecutableLocation.Windows.Name;
}
else if (this._platform.IsOSX)
{
directoryName = ExecutableLocation.OSX.Directory;
executableName = ExecutableLocation.OSX.Name;
}
else if (this._platform.IsLinux)
{
directoryName = ExecutableLocation.Linux.Directory;
executableName = ExecutableLocation.Linux.Name;
}
else
{
_log.Error("Failed to determine runtime OS for kubectl.");
throw new KubectlException(CommonResources.KubectlNotSupportedMessage);
}
var kubectlPath = _fileSystem.Path.Combine(_fileSystem.Path.GetExecutingAssemblyDirectoryPath(), ExecutableLocation.ParentDirectory, directoryName, executableName);
AssertHelper.True(_fileSystem.FileExists(kubectlPath), $"Private copy of kubectl not found at expected location: '{kubectlPath}'");
_log.Verbose($"Using kubectl found at: '{kubectlPath}'");
return kubectlPath;
}
}
}