in src/routingmanager/RoutingStateEstablisher.cs [690:860]
private Task UpdateClusterStateAsync(
IEnumerable<IMetadata<V1ObjectMeta>> currentObjectsList,
IEnumerable<IMetadata<V1ObjectMeta>> expectedObjectsList,
KubernetesResourceType k8sResourceType,
CancellationToken cancellationToken)
{
// Incrementally create, update and delete services
var currentObjects = currentObjectsList.ToList().Ordered();
var expectedObjects = expectedObjectsList.ToList().Ordered();
int currentIndex = 0, expectedIndex = 0;
IList<Task> tasks = new List<Task>();
try
{
while (currentIndex < currentObjects.Count() || expectedIndex < expectedObjects.Count())
{
cancellationToken.ThrowIfCancellationRequested();
var currentMeta = currentIndex < currentObjects.Count() ? currentObjects[currentIndex].Metadata : null;
var expectedMeta = expectedIndex < expectedObjects.Count() ? expectedObjects[expectedIndex].Metadata : null;
var currentKey = currentMeta != null ? $"{currentMeta.Name}" : null;
var pendingKey = expectedMeta != null ? $"{expectedMeta.Name}" : null;
var comparison = String.Compare(currentKey, pendingKey);
if (currentKey == null || pendingKey == null)
{
// If there are no more pending ingresses, then the string
// comparison will always return >0, but we want it to
// return <0 so that it deletes remaining current ingresses.
// Same reasoning when there are no more current ingresses.
comparison = -comparison;
}
if (comparison < 0)
{
switch (k8sResourceType)
{
case KubernetesResourceType.Service:
_log.Info("Deleting service '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.DeleteV1ServiceAsync(currentMeta.NamespaceProperty, currentMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Deployment:
_log.Info("Deleting deployment '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.DeleteDeploymentsInNamespaceAsync(currentMeta.NamespaceProperty, currentMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.ConfigMap:
_log.Info("Deleting config map '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.DeleteNamespacedConfigMapAsync(currentMeta.NamespaceProperty, currentMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Ingress:
_log.Info("Deleting ingress '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.DeleteNamespacedIngressAsync(currentMeta.NamespaceProperty, currentMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.IngressRoute:
_log.Info("Deleting ingressRoute '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.DeleteNamespacedIngressRouteAsync(currentMeta.NamespaceProperty, currentMeta.Name, cancellationToken: cancellationToken));
break;
default:
_log.Error("Unknown kubernetes object of type '{0}'. ", currentObjects[currentIndex].GetType().Name);
throw new RoutingException(Resources.UnknownKubernetesObjectFormat, currentObjects[currentIndex].GetType().Name);
}
currentIndex++;
}
else if (comparison == 0)
{
switch (k8sResourceType)
{
case KubernetesResourceType.Service:
_log.Info("Replacing service '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
var expectedService = expectedObjects[expectedIndex] as V1Service;
var currentService = currentObjects[currentIndex] as V1Service;
expectedService.Metadata.ResourceVersion = currentService.Metadata.ResourceVersion;
expectedService.Spec.ClusterIP = currentService.Spec.ClusterIP;
expectedService.Spec.LoadBalancerIP = currentService.Spec.LoadBalancerIP;
tasks.Add(_kubernetesClient.ReplaceV1ServiceAsync(expectedMeta.NamespaceProperty, (V1Service)expectedObjects[expectedIndex], expectedMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Deployment:
_log.Info("Replacing deployment '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
var expectedDeployment = (V1Deployment)expectedObjects[expectedIndex];
if (expectedMeta.Labels.IsEqual(currentMeta.Labels))
{
tasks.Add(_kubernetesClient.ReplaceNamespacedDeploymentAsync(expectedMeta.NamespaceProperty, expectedDeployment, expectedMeta.Name, cancellationToken: cancellationToken));
}
else
{
// Deleting and recreating because in k8s API version apps/v1 (what we use for envoy deployment), label selectors are immutable after creation
tasks.Add(
Task.Run(async () =>
{
await _kubernetesClient.DeleteDeploymentsInNamespaceAsync(expectedMeta.NamespaceProperty, expectedMeta.Name, cancellationToken: cancellationToken);
await _kubernetesClient.CreateNamespacedDeploymentAsync(expectedMeta.NamespaceProperty, expectedDeployment, cancellationToken: cancellationToken);
}));
}
break;
case KubernetesResourceType.ConfigMap:
_log.Info("Replacing config map '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
(expectedObjects[expectedIndex] as V1ConfigMap).Metadata.ResourceVersion = (currentObjects[currentIndex] as V1ConfigMap).Metadata.ResourceVersion;
tasks.Add(_kubernetesClient.ReplaceNamespacedConfigMapAsync(expectedMeta.NamespaceProperty, (V1ConfigMap)expectedObjects[expectedIndex], expectedMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Ingress:
_log.Info("Replacing ingress '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
(expectedObjects[expectedIndex] as V1Ingress).Metadata.ResourceVersion = (currentObjects[currentIndex] as V1Ingress).Metadata.ResourceVersion;
tasks.Add(_kubernetesClient.ReplaceNamespacedIngress1Async(expectedMeta.NamespaceProperty, (V1Ingress)expectedObjects[expectedIndex], expectedMeta.Name, cancellationToken: cancellationToken));
break;
case KubernetesResourceType.IngressRoute:
_log.Info("Replacing ingressRoute '{0}.{1}'", new PII(currentMeta.Name), new PII(currentMeta.NamespaceProperty));
(expectedObjects[expectedIndex] as IngressRoute).Metadata.ResourceVersion = (currentObjects[currentIndex] as IngressRoute).Metadata.ResourceVersion;
tasks.Add(_kubernetesClient.ApplyNamespacedIngressRouteAsync(expectedMeta.NamespaceProperty, (IngressRoute)expectedObjects[expectedIndex], cancellationToken));
break;
default:
_log.Error("Unknown kubernetes object of type '{0}'. ", currentObjects[currentIndex].GetType().Name);
throw new RoutingException(Resources.UnknownKubernetesObjectFormat, currentObjects[currentIndex].GetType().Name);
}
currentIndex++;
expectedIndex++;
}
else
{
switch (k8sResourceType)
{
case KubernetesResourceType.Service:
_log.Info("Creating service '{0}.{1}'", new PII(expectedMeta.Name), new PII(expectedMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.CreateNamespacedServiceAsync(expectedMeta.NamespaceProperty, (V1Service)expectedObjects[expectedIndex], cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Deployment:
_log.Info("Creating deployment '{0}.{1}'", new PII(expectedMeta.Name), new PII(expectedMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.CreateNamespacedDeploymentAsync(expectedMeta.NamespaceProperty, (V1Deployment)expectedObjects[expectedIndex], cancellationToken: cancellationToken));
break;
case KubernetesResourceType.ConfigMap:
_log.Info("Creating config map '{0}.{1}'", new PII(expectedMeta.Name), new PII(expectedMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.CreateNamespacedConfigMapAsync(expectedMeta.NamespaceProperty, (V1ConfigMap)expectedObjects[expectedIndex], cancellationToken: cancellationToken));
break;
case KubernetesResourceType.Ingress:
_log.Info("Creating ingress '{0}.{1}'", new PII(expectedMeta.Name), new PII(expectedMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.CreateNamespacedIngressAsync(expectedMeta.NamespaceProperty, (V1Ingress)expectedObjects[expectedIndex], cancellationToken: cancellationToken));
break;
case KubernetesResourceType.IngressRoute:
_log.Info("Creating ingressRoute '{0}.{1}'", new PII(expectedMeta.Name), new PII(expectedMeta.NamespaceProperty));
tasks.Add(_kubernetesClient.ApplyNamespacedIngressRouteAsync(expectedMeta.NamespaceProperty, (IngressRoute)expectedObjects[expectedIndex], cancellationToken: cancellationToken));
break;
}
expectedIndex++;
}
}
}
catch (HttpOperationException e) when (StringComparer.OrdinalIgnoreCase.Equals(e.Response.ReasonPhrase, Constants.KubernetesError.Conflict))
{
_log.Warning("Logged exception from KubernetesClient with reason phrase '{0}' : Response Content : '{1}'", e.Response.ReasonPhrase, e.Response.Content);
throw;
}
catch (HttpOperationException e) when (StringComparer.OrdinalIgnoreCase.Equals(e.Response.ReasonPhrase, Constants.KubernetesError.UnprocessableEntity))
{
// bug#1220078 : For processing unprocessableEntity errors arising from here.
_log.Error("Logged exception from KubernetesClient with reason phrase '{0}' : Response Content : '{1}'", e.Response.ReasonPhrase, e.Response.Content);
throw;
}
// bug 1006600
return Task.WhenAll(tasks);
}