pkg/controller/elasticsearch/services/services.go (191 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package services
import (
"fmt"
"math/rand"
"strconv"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/network"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil"
)
const (
globalServiceSuffix = ".svc"
RemoteClusterServicePortName = "rcs"
)
// TransportServiceName returns the name for the transport service associated to this cluster
func TransportServiceName(esName string) string {
return esv1.TransportService(esName)
}
// NewTransportService returns the transport service associated with the given cluster.
// It is used by Elasticsearch nodes to talk to remote cluster nodes.
func NewTransportService(es esv1.Elasticsearch) *corev1.Service {
nsn := k8s.ExtractNamespacedName(&es)
svc := corev1.Service{
ObjectMeta: es.Spec.Transport.Service.ObjectMeta,
Spec: es.Spec.Transport.Service.Spec,
}
svc.ObjectMeta.Namespace = es.Namespace
svc.ObjectMeta.Name = TransportServiceName(es.Name)
// Nodes need to discover themselves before the pod is considered ready,
// otherwise minimum master nodes would never be reached
svc.Spec.PublishNotReadyAddresses = true
if svc.Spec.Type == "" {
svc.Spec.Type = corev1.ServiceTypeClusterIP
// We set ClusterIP to None in order to let the ES nodes discover all other node IPs at once.
svc.Spec.ClusterIP = "None"
}
labels := label.NewLabels(nsn)
ports := []corev1.ServicePort{
{
Name: "tls-transport", // prefix with protocol for Istio compatibility
Protocol: corev1.ProtocolTCP,
Port: network.TransportPort,
},
}
return defaults.SetServiceDefaults(&svc, labels, labels, ports)
}
// ExternalServiceName returns the name for the external service
// associated to this cluster
func ExternalServiceName(esName string) string {
return esv1.HTTPService(esName)
}
// InternalServiceName returns the name for the internal service
// associated to this cluster, managed by the operator exclusively.
func InternalServiceName(esName string) string {
return esv1.InternalHTTPService(esName)
}
// RemoteClusterServiceName returns the name for the remote cluster service used when the cluster is expected to be accessed
// using the remote cluster server. Managed by the operator exclusively.
func RemoteClusterServiceName(esName string) string {
return esv1.RemoteClusterService(esName)
}
// ExternalTransportServiceHost returns the hostname and the port used to reach Elasticsearch's transport endpoint.
func ExternalTransportServiceHost(es types.NamespacedName) string {
return stringsutil.Concat(TransportServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.TransportPort))
}
// RemoteClusterServerServiceHost returns the hostname and the port used to reach Elasticsearch's remote cluster server endpoint.
func RemoteClusterServerServiceHost(es types.NamespacedName) string {
return stringsutil.Concat(RemoteClusterServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.RemoteClusterPort))
}
// ExternalServiceURL returns the URL used to reach Elasticsearch's external endpoint.
func ExternalServiceURL(es esv1.Elasticsearch) string {
return stringsutil.Concat(es.Spec.HTTP.Protocol(), "://", ExternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort))
}
// InternalServiceURL returns the URL used to reach Elasticsearch's internally managed service
func InternalServiceURL(es esv1.Elasticsearch) string {
return stringsutil.Concat(es.Spec.HTTP.Protocol(), "://", InternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort))
}
// NewExternalService returns the external service associated to the given cluster.
// It is used by users to perform requests against one of the cluster nodes.
func NewExternalService(es esv1.Elasticsearch) *corev1.Service {
nsn := k8s.ExtractNamespacedName(&es)
svc := corev1.Service{
ObjectMeta: es.Spec.HTTP.Service.ObjectMeta,
Spec: es.Spec.HTTP.Service.Spec,
}
svc.ObjectMeta.Namespace = es.Namespace
svc.ObjectMeta.Name = ExternalServiceName(es.Name)
// defaults to ClusterIP if not set
if svc.Spec.Type == "" {
svc.Spec.Type = corev1.ServiceTypeClusterIP
}
labels := label.NewLabels(nsn)
ports := []corev1.ServicePort{
{
Name: es.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: network.HTTPPort,
},
}
return defaults.SetServiceDefaults(&svc, labels, labels, ports)
}
// NewInternalService returns the internal service associated to the given cluster.
// It is used by the operator to perform requests against the Elasticsearch cluster nodes,
// and does not inherit the spec defined within the Elasticsearch custom resource,
// to remove the possibility of the user misconfiguring access to the ES cluster.
func NewInternalService(es esv1.Elasticsearch) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: InternalServiceName(es.Name),
Namespace: es.Namespace,
Labels: label.NewLabels(k8s.ExtractNamespacedName(&es)),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: es.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: network.HTTPPort,
},
},
Selector: label.NewLabels(k8s.ExtractNamespacedName(&es)),
PublishNotReadyAddresses: false,
},
}
}
// NewRemoteClusterService returns the service associated to the remote cluster service for the given cluster.
func NewRemoteClusterService(es esv1.Elasticsearch) *corev1.Service {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: RemoteClusterServiceName(es.Name),
Namespace: es.Namespace,
Labels: label.NewLabels(k8s.ExtractNamespacedName(&es)),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: label.NewLabels(k8s.ExtractNamespacedName(&es)),
PublishNotReadyAddresses: true,
ClusterIP: "None",
},
}
labels := label.NewLabels(k8s.ExtractNamespacedName(&es))
ports := []corev1.ServicePort{
{
Name: RemoteClusterServicePortName,
Protocol: corev1.ProtocolTCP,
Port: network.RemoteClusterPort,
},
}
return defaults.SetServiceDefaults(svc, labels, labels, ports)
}
type urlProvider struct {
pods func() ([]corev1.Pod, error)
svcURL string
}
// URL implements client.URLProvider.
func (u *urlProvider) URL() (string, error) {
var ready, running []corev1.Pod
pods, err := u.pods()
if err != nil {
return "", err
}
for _, p := range pods {
if k8s.IsPodReady(p) {
ready = append(ready, p)
}
if k8s.IsPodRunning(p) {
running = append(running, p)
}
}
switch {
case len(ready) > 0:
return randomESPodURL(ready), nil
case len(running) > 0:
return randomESPodURL(running), nil
default:
return u.svcURL, nil
}
}
// Equals implements client.URLProvider.
func (u *urlProvider) Equals(other client.URLProvider) bool {
otherImpl, ok := other.(*urlProvider)
if !ok {
return false
}
return u.svcURL == otherImpl.svcURL
}
// HasEndpoints implements client.URLProvider.
func (u *urlProvider) HasEndpoints() bool {
pods, err := u.pods()
return err == nil && len(k8s.RunningPods(pods)) > 0
}
// NewElasticsearchURLProvider returns a client.URLProvider that dynamically tries to find Pod URLs among the
// currently running Pods. Preferring ready Pods over running ones.
func NewElasticsearchURLProvider(es esv1.Elasticsearch, client k8s.Client) client.URLProvider {
return &urlProvider{
pods: func() ([]corev1.Pod, error) {
return k8s.PodsMatchingLabels(client, es.Namespace, label.NewLabelSelectorForElasticsearch(es))
},
svcURL: InternalServiceURL(es),
}
}
func randomESPodURL(pods []corev1.Pod) string {
randomPod := pods[rand.Intn(len(pods))] //nolint:gosec
return ElasticsearchPodURL(randomPod)
}
// ElasticsearchPodURL calculates the URL for the given Pod based on the Pods metadata.
func ElasticsearchPodURL(pod corev1.Pod) string {
scheme, hasSchemeLabel := pod.Labels[label.HTTPSchemeLabelName]
sset, hasSsetLabel := pod.Labels[label.StatefulSetNameLabelName]
if hasSsetLabel && hasSchemeLabel {
return fmt.Sprintf("%s://%s.%s.%s:%d", scheme, pod.Name, sset, pod.Namespace, network.HTTPPort)
}
return ""
}