pkg/controller/elasticsearch/client/v6.go (196 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package client import ( "context" "net/http" "net/url" "github.com/pkg/errors" "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1alpha1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil" ) var errNotSupportedInEs6x = errors.New("not supported in Elasticsearch 6.x") type clientV6 struct { baseClient } func (c *clientV6) InvalidateCrossClusterAPIKey(context.Context, string) error { return errNotSupportedInEs6x } func (c *clientV6) CreateCrossClusterAPIKey(_ context.Context, _ CrossClusterAPIKeyCreateRequest) (CrossClusterAPIKeyCreateResponse, error) { return CrossClusterAPIKeyCreateResponse{}, errNotSupportedInEs6x } func (c *clientV6) UpdateCrossClusterAPIKey(_ context.Context, _ string, _ CrossClusterAPIKeyUpdateRequest) (CrossClusterAPIKeyUpdateResponse, error) { return CrossClusterAPIKeyUpdateResponse{}, errNotSupportedInEs6x } func (c *clientV6) GetCrossClusterAPIKeys(_ context.Context, _ string) (CrossClusterAPIKeyList, error) { return CrossClusterAPIKeyList{}, errNotSupportedInEs6x } func (c *clientV6) Version() version.Version { return c.version } func (c *clientV6) GetClusterInfo(ctx context.Context) (Info, error) { var info Info err := c.get(ctx, "/", &info) return info, err } func (c *clientV6) GetClusterRoutingAllocation(ctx context.Context) (ClusterRoutingAllocation, error) { var settings ClusterRoutingAllocation err := c.get(ctx, "/_cluster/settings", &settings) return settings, err } func (c *clientV6) updateAllocationEnable(ctx context.Context, value string) error { allocationSettings := ClusterRoutingAllocation{ Transient: AllocationSettings{ Cluster: ClusterRoutingSettings{ Routing: RoutingSettings{ Allocation: RoutingAllocationSettings{ Enable: value, }, }, }, }, } return c.put(ctx, "/_cluster/settings", allocationSettings, nil) } func (c *clientV6) EnableShardAllocation(ctx context.Context) error { return c.updateAllocationEnable(ctx, "all") } func (c *clientV6) DisableReplicaShardsAllocation(ctx context.Context) error { return c.updateAllocationEnable(ctx, "primaries") } func (c *clientV6) RemoveTransientAllocationSettings(ctx context.Context) error { allocationSettings := struct { Transient struct { Exclude *string `json:"cluster.routing.allocation.exclude._name"` Enable *string `json:"cluster.routing.allocation.enable"` } `json:"transient"` }{} return c.put(ctx, "/_cluster/settings", allocationSettings, nil) } func (c *clientV6) SyncedFlush(ctx context.Context) error { return c.post(ctx, "/_flush/synced", nil, nil) } func (c *clientV6) Flush(ctx context.Context) error { return c.post(ctx, "/_flush", nil, nil) } func (c *clientV6) GetClusterHealth(ctx context.Context) (Health, error) { var result Health err := c.get(ctx, "/_cluster/health", &result) return result, err } func (c *clientV6) GetClusterHealthWaitForAllEvents(ctx context.Context) (Health, error) { var result Health // wait for all events means wait for all events down to `languid` events which is the lowest event priority pathWithQuery := "/_cluster/health?wait_for_events=languid&timeout=0s" // ignore timeout errors as they are communicated in the returned payload and a timeout is to be expected // given the query parameters. 408 for other reasons than the clients timeout parameter should not happen // as they are expected only on idle connections https://go-review.googlesource.com/c/go/+/179457/4/src/net/http/transport.go#1931 err := c.request(ctx, http.MethodGet, pathWithQuery, nil, &result, IsTimeout) return result, err } func (c *clientV6) SetMinimumMasterNodes(ctx context.Context, n int) error { zenSettings := DiscoveryZenSettings{ Transient: DiscoveryZen{MinimumMasterNodes: n}, Persistent: DiscoveryZen{MinimumMasterNodes: n}, } return c.put(ctx, "/_cluster/settings", &zenSettings, nil) } func (c *clientV6) ReloadSecureSettings(ctx context.Context) error { return c.post(ctx, "/_nodes/reload_secure_settings", nil, nil) } func (c *clientV6) GetNodes(ctx context.Context) (Nodes, error) { var nodes Nodes // restrict call to minimal node information with a non-existent metric filter err := c.get(ctx, "/_nodes/_all/no-metrics", &nodes) return nodes, err } func (c *clientV6) GetNodesStats(ctx context.Context) (NodesStats, error) { var nodesStats NodesStats // restrict call to basic node info only err := c.get(ctx, "/_nodes/_all/stats/os", &nodesStats) return nodesStats, err } func (c *clientV6) UpdateRemoteClusterSettings(ctx context.Context, settings RemoteClustersSettings) error { return c.put(ctx, "/_cluster/settings", &settings, nil) } func (c *clientV6) GetRemoteClusterSettings(ctx context.Context) (RemoteClustersSettings, error) { remoteClustersSettings := RemoteClustersSettings{} err := c.get(ctx, "/_cluster/settings", &remoteClustersSettings) return remoteClustersSettings, err } func (c *clientV6) GetLicense(ctx context.Context) (License, error) { var license LicenseResponse err := c.get(ctx, "/_xpack/license", &license) return license.License, err } func (c *clientV6) UpdateLicense(ctx context.Context, licenses LicenseUpdateRequest) (LicenseUpdateResponse, error) { var response LicenseUpdateResponse err := c.post(ctx, "/_xpack/license?acknowledge=true", licenses, &response) return response, err } func (c *clientV6) StartTrial(ctx context.Context) (StartTrialResponse, error) { var response StartTrialResponse err := c.post(ctx, "/_xpack/license/start_trial?acknowledge=true", nil, &response) return response, err } func (c *clientV6) StartBasic(ctx context.Context) (StartBasicResponse, error) { var response StartBasicResponse err := c.post(ctx, "/_xpack/license/start_basic?acknowledge=true", nil, &response) return response, err } func (c *clientV6) AddVotingConfigExclusions(_ context.Context, _ []string) error { return errNotSupportedInEs6x } func (c *clientV6) DeleteVotingConfigExclusions(_ context.Context, _ bool) error { return errNotSupportedInEs6x } func (c *clientV6) DeleteAutoscalingPolicies(_ context.Context) error { return errNotSupportedInEs6x } func (c *clientV6) CreateAutoscalingPolicy(_ context.Context, _ string, _ v1alpha1.AutoscalingPolicy) error { return errNotSupportedInEs6x } func (c *clientV6) GetAutoscalingCapacity(_ context.Context) (AutoscalingCapacityResult, error) { return AutoscalingCapacityResult{}, errNotSupportedInEs6x } func (c *clientV6) UpdateMLNodesSettings(_ context.Context, _ int32, _ string) error { return errNotSupportedInEs6x } func (c *clientV6) GetShutdown(context.Context, *string) (ShutdownResponse, error) { return ShutdownResponse{}, errNotSupportedInEs6x } func (c *clientV6) PutShutdown(context.Context, string, ShutdownType, string) error { return errNotSupportedInEs6x } func (c *clientV6) DeleteShutdown(context.Context, string) error { return errNotSupportedInEs6x } func (c *clientV6) ClusterBootstrappedForZen2(ctx context.Context) (bool, error) { // Look at the current master node of the cluster: if it's running version 7.x.x or above, // the cluster has been bootstrapped. // Even though c is a clientV6, it may be targeting a mixed v6/v7 having a v7 master. var response Nodes if err := c.get(ctx, "/_nodes/_master", &response); err != nil { return false, err } if len(response.Nodes) == 0 { // no known master node (yet), consider the cluster is not bootstrapped return false, nil } for _, master := range response.Nodes { return master.isV7OrAbove() } // should never happen since we ensured a single entry in the above map return false, errors.New("no master found in ClusterBootstrappedForZen2") } func (c *clientV6) GetClusterState(_ context.Context) (ClusterState, error) { return ClusterState{}, errors.New("cluster state is not supported in Elasticsearch 6.x") } func (c *clientV6) Request(ctx context.Context, r *http.Request) (*http.Response, error) { baseURL, err := c.URLProvider.URL() if err != nil { return nil, err } newURL, err := url.Parse(stringsutil.Concat(baseURL, r.URL.String())) if err != nil { return nil, err } r.URL = newURL return c.doRequest(ctx, r) } // Equal returns true if c2 can be considered the same as c func (c *clientV6) Equal(c2 Client) bool { other, ok := c2.(*clientV6) if !ok { return false } return c.baseClient.equal(&other.baseClient) } var _ Client = &clientV6{}