in src/routingmanager/RoutingManagerApp.cs [609:682]
private async Task AddPodTriggersAsync(
ConcurrentDictionary<V1Service, RoutingStateEstablisherInput> routingStateEstablisherInputMap,
IEnumerable<V1Pod> pods,
IEnumerable<V1Service> userServices,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (var pod in pods)
{
if (pod.Metadata.IsRoutingTrigger())
{
_log.Info("Pod '{0}' is a pod trigger", new PII(pod.Metadata.Name));
var routeOnHeader = pod.Metadata.GetRouteOnHeader(_log);
var routeFromServiceName = pod.Metadata.GetRouteFromServiceName(_log);
var triggerService = userServices.FirstOrDefault(svc => StringComparer.OrdinalIgnoreCase.Equals(svc.Metadata.Name, routeFromServiceName));
if (triggerService == default(V1Service))
{
_log.Error("Service '{0}' not found which was expected to be trigger service. Skipping this pod trigger named '{1}'", new PII(routeFromServiceName), new PII(pod.Metadata.Name));
continue;
}
V1Pod pod_latest = null;
if (string.IsNullOrWhiteSpace(pod.Status.PodIP))
{
await WebUtilities.RetryUntilTimeAsync(async _ =>
{
pod_latest = await _kubernetesClient.GetV1PodAsync(pod.Metadata.NamespaceProperty, pod.Metadata.Name, cancellationToken: cancellationToken);
if (pod_latest == null || pod_latest.Status == null || string.IsNullOrWhiteSpace(pod_latest.Status.PodIP))
{
return false;
}
return true;
},
maxWaitTime: TimeSpan.FromSeconds(5),
cancellationToken: cancellationToken);
}
if (pod_latest != null && string.IsNullOrEmpty(pod_latest.Status.PodIP))
{
_log.Error("Could not retrive IP for pod trigger : '{0}', skipping this trigger", new PII(pod_latest.Metadata.Name));
continue;
}
_log.Info("Retrieved pod ip for pod trigger '{0}'", new PII(pod.Metadata.Name));
if (!await ReplaceServiceNamedPortsAsync(triggerService, pods, cancellationToken))
{
_log.Warning("'{0}' Trigger service's underlying pods or pod's corresponding port was not found in order to resolve named target port. Ignoring corresponding pod trigger name '{1}' as trigger", new PII(triggerService.Metadata.Name), new PII(pod.Metadata.Name));
continue;
}
var correlationId = string.Empty;
try
{
correlationId = pod.Metadata.GetCorrelationId();
}
catch (Exception ex)
{
// This is not critical and, if we fail to get the CorrelationId, we should keep working
_log.Warning(ex.Message);
}
var podTriggerToAdd =
new PodTriggerConfig(
namespaceName: triggerService.Metadata.NamespaceProperty,
triggerService: triggerService,
lpkPodName: pod.Metadata.Name,
routeOnHeaderKey: routeOnHeader.headerName,
routeOnHeaderValue: routeOnHeader.headerValue,
triggerPodIP: pod_latest == null ? pod.Status.PodIP : pod_latest.Status.PodIP,
correlationId: correlationId);
routingStateEstablisherInputMap.AddOrUpdateWithTrigger(triggerService, podTriggerToAdd);
}
}
}