private async Task AddPodTriggersAsync()

in src/routingmanager/RoutingManagerApp.cs [609:682]


        private async Task AddPodTriggersAsync(
            ConcurrentDictionary<V1Service, RoutingStateEstablisherInput> routingStateEstablisherInputMap,
            IEnumerable<V1Pod> pods,
            IEnumerable<V1Service> userServices,
            CancellationToken cancellationToken)
        {
            cancellationToken.ThrowIfCancellationRequested();
            foreach (var pod in pods)
            {
                if (pod.Metadata.IsRoutingTrigger())
                {
                    _log.Info("Pod '{0}' is a pod trigger", new PII(pod.Metadata.Name));
                    var routeOnHeader = pod.Metadata.GetRouteOnHeader(_log);
                    var routeFromServiceName = pod.Metadata.GetRouteFromServiceName(_log);
                    var triggerService = userServices.FirstOrDefault(svc => StringComparer.OrdinalIgnoreCase.Equals(svc.Metadata.Name, routeFromServiceName));
                    if (triggerService == default(V1Service))
                    {
                        _log.Error("Service '{0}' not found which was expected to be trigger service. Skipping this pod trigger named '{1}'", new PII(routeFromServiceName), new PII(pod.Metadata.Name));
                        continue;
                    }

                    V1Pod pod_latest = null;
                    if (string.IsNullOrWhiteSpace(pod.Status.PodIP))
                    {
                        await WebUtilities.RetryUntilTimeAsync(async _ =>
                        {
                            pod_latest = await _kubernetesClient.GetV1PodAsync(pod.Metadata.NamespaceProperty, pod.Metadata.Name, cancellationToken: cancellationToken);
                            if (pod_latest == null || pod_latest.Status == null || string.IsNullOrWhiteSpace(pod_latest.Status.PodIP))
                            {
                                return false;
                            }
                            return true;
                        },
                        maxWaitTime: TimeSpan.FromSeconds(5),
                        cancellationToken: cancellationToken);
                    }
                    if (pod_latest != null && string.IsNullOrEmpty(pod_latest.Status.PodIP))
                    {
                        _log.Error("Could not retrive IP for pod trigger : '{0}', skipping this trigger", new PII(pod_latest.Metadata.Name));
                        continue;
                    }

                    _log.Info("Retrieved pod ip for pod trigger '{0}'", new PII(pod.Metadata.Name));

                    if (!await ReplaceServiceNamedPortsAsync(triggerService, pods, cancellationToken))
                    {
                        _log.Warning("'{0}' Trigger service's underlying pods or pod's corresponding port was not found in order to resolve named target port. Ignoring corresponding pod trigger name '{1}' as trigger", new PII(triggerService.Metadata.Name), new PII(pod.Metadata.Name));
                        continue;
                    }

                    var correlationId = string.Empty;
                    try
                    {
                        correlationId = pod.Metadata.GetCorrelationId();
                    }
                    catch (Exception ex)
                    {
                        // This is not critical and, if we fail to get the CorrelationId, we should keep working
                        _log.Warning(ex.Message);
                    }

                    var podTriggerToAdd =
                        new PodTriggerConfig(
                            namespaceName: triggerService.Metadata.NamespaceProperty,
                            triggerService: triggerService,
                            lpkPodName: pod.Metadata.Name,
                            routeOnHeaderKey: routeOnHeader.headerName,
                            routeOnHeaderValue: routeOnHeader.headerValue,
                            triggerPodIP: pod_latest == null ? pod.Status.PodIP : pod_latest.Status.PodIP,
                            correlationId: correlationId);
                    routingStateEstablisherInputMap.AddOrUpdateWithTrigger(triggerService, podTriggerToAdd);
                }
            }
        }