in internal/ingress/controller/controller.go [863:1009]
func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
upstreams := make(map[string]*ingress.Backend)
upstreams[defUpstreamName] = du
for _, ing := range data {
ingKey := k8s.MetaNamespaceKey(ing)
anns := ing.ParsedAnnotations
if !n.store.GetBackendConfiguration().AllowSnippetAnnotations {
dropSnippetDirectives(anns, ingKey)
}
var defBackend string
if ing.Spec.DefaultBackend != nil && ing.Spec.DefaultBackend.Service != nil {
defBackend = upstreamName(ing.Namespace, ing.Spec.DefaultBackend.Service)
klog.V(3).Infof("Creating upstream %q", defBackend)
upstreams[defBackend] = newUpstream(defBackend)
upstreams[defBackend].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
upstreams[defBackend].LoadBalancing = anns.LoadBalancing
if upstreams[defBackend].LoadBalancing == "" {
upstreams[defBackend].LoadBalancing = n.store.GetBackendConfiguration().LoadBalancing
}
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.DefaultBackend.Service.Name)
// add the service ClusterIP as a single Endpoint instead of individual Endpoints
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, ing.Spec.DefaultBackend)
if err != nil {
klog.Errorf("Failed to determine a suitable ClusterIP Endpoint for Service %q: %v", svcKey, err)
} else {
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
}
}
// configure traffic shaping for canary
if anns.Canary.Enabled {
upstreams[defBackend].NoServer = true
upstreams[defBackend].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
Weight: anns.Canary.Weight,
WeightTotal: anns.Canary.WeightTotal,
Header: anns.Canary.Header,
HeaderValue: anns.Canary.HeaderValue,
HeaderPattern: anns.Canary.HeaderPattern,
Cookie: anns.Canary.Cookie,
}
}
if len(upstreams[defBackend].Endpoints) == 0 {
_, port := upstreamServiceNameAndPort(ing.Spec.DefaultBackend.Service)
endps, err := n.serviceEndpoints(svcKey, port.String())
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
klog.Warningf("Error creating upstream %q: %v", defBackend, err)
}
}
s, err := n.store.GetService(svcKey)
if err != nil {
klog.Warningf("Error obtaining Service %q: %v", svcKey, err)
}
upstreams[defBackend].Service = s
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
continue
}
for _, path := range rule.HTTP.Paths {
if path.Backend.Service == nil {
// skip non-service backends
klog.V(3).Infof("Ingress %q and path %q does not contain a service backend, using default backend", ingKey, path.Path)
continue
}
name := upstreamName(ing.Namespace, path.Backend.Service)
svcName, svcPort := upstreamServiceNameAndPort(path.Backend.Service)
if _, ok := upstreams[name]; ok {
continue
}
klog.V(3).Infof("Creating upstream %q", name)
upstreams[name] = newUpstream(name)
upstreams[name].Port = svcPort
upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
upstreams[name].LoadBalancing = anns.LoadBalancing
if upstreams[name].LoadBalancing == "" {
upstreams[name].LoadBalancing = n.store.GetBackendConfiguration().LoadBalancing
}
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, svcName)
// add the service ClusterIP as a single Endpoint instead of individual Endpoints
if anns.ServiceUpstream {
endpoint, err := n.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
klog.Errorf("Failed to determine a suitable ClusterIP Endpoint for Service %q: %v", svcKey, err)
} else {
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
}
}
// configure traffic shaping for canary
if anns.Canary.Enabled {
upstreams[name].NoServer = true
upstreams[name].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
Weight: anns.Canary.Weight,
Header: anns.Canary.Header,
HeaderValue: anns.Canary.HeaderValue,
HeaderPattern: anns.Canary.HeaderPattern,
Cookie: anns.Canary.Cookie,
}
}
if len(upstreams[name].Endpoints) == 0 {
_, port := upstreamServiceNameAndPort(path.Backend.Service)
endp, err := n.serviceEndpoints(svcKey, port.String())
if err != nil {
klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
continue
}
upstreams[name].Endpoints = endp
}
s, err := n.store.GetService(svcKey)
if err != nil {
klog.Warningf("Error obtaining Service %q: %v", svcKey, err)
continue
}
upstreams[name].Service = s
}
}
}
return upstreams
}