functionaltests/internal/kbclient/upgrade_assist.go (145 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kbclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"
)
func (c *Client) ResolveMigrationDeprecations(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if err := c.migrateSystemIndicesAndWait(ctx); err != nil {
return err
}
deprecations, err := c.QueryCriticalESDeprecations(ctx)
if err != nil {
return fmt.Errorf("failed to query critical deprecations: %w", err)
}
var errs []error
for _, deprecation := range deprecations {
switch deprecation.Type {
case "index_settings":
errs = append(errs, c.markIndexAsReadOnly(ctx, deprecation.Name))
case "data_streams":
errs = append(errs, c.markDataStreamAsReadOnly(
ctx,
deprecation.Name,
deprecation.CorrectiveAction.Metadata.IndicesRequiringUpgrade,
))
default:
errs = append(errs, fmt.Errorf("unknown deprecation type: %s", deprecation.Type))
}
}
return errors.Join(errs...)
}
type MigrationDeprecation struct {
Name string `json:"index"`
Type string `json:"type"`
IsCritical bool `json:"isCritical"`
Level string `json:"level"`
CorrectiveAction struct {
Type string `json:"type"`
Metadata struct {
IndicesRequiringUpgrade []string `json:"indicesRequiringUpgrade,omitempty"`
}
} `json:"correctiveAction"`
}
func (d MigrationDeprecation) isLevelCritical() bool {
// Older Kibana uses "isCritical" while newer Kibana uses "level".
return d.IsCritical || strings.EqualFold(d.Level, "critical")
}
type esDeprecationsResponse struct {
MigrationDeprecations []MigrationDeprecation `json:"migrationsDeprecations"`
}
// QueryCriticalESDeprecations retrieves the critical deprecation warnings for Elasticsearch.
// It is essentially equivalent to `GET _migration/deprecations`, but through Kibana Upgrade
// Assistant API.
func (c *Client) QueryCriticalESDeprecations(ctx context.Context) ([]MigrationDeprecation, error) {
path := "/api/upgrade_assistant/es_deprecations"
b, err := c.sendRequest(ctx, http.MethodGet, path, nil, nil)
if err != nil {
return nil, err
}
var esDeprecationsResp esDeprecationsResponse
if err = json.Unmarshal(b, &esDeprecationsResp); err != nil {
return nil, fmt.Errorf("cannot unmarshal response body: %w", err)
}
// Remove all non-critical deprecation info.
return slices.DeleteFunc(
esDeprecationsResp.MigrationDeprecations,
func(dep MigrationDeprecation) bool {
return !dep.isLevelCritical()
},
), nil
}
type upgradeAssistUpdateIndexRequest struct {
Operations []string `json:"operations"`
}
// markIndexAsReadOnly updates the index to read-only through the Upgrade Assistant API:
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
func (c *Client) markIndexAsReadOnly(ctx context.Context, index string) error {
path := fmt.Sprintf("/api/upgrade_assistant/update_index/%s", index)
req := upgradeAssistUpdateIndexRequest{
Operations: []string{"blockWrite", "unfreeze"},
}
_, err := c.sendRequest(ctx, http.MethodPost, path, req, nil)
return err
}
type upgradeAssistMigrateDSRequest struct {
Indices []string `json:"indices"`
}
// markDataStreamAsReadOnly marks the backing indices of the data stream as read-only
// through the Upgrade Assistant API:
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
func (c *Client) markDataStreamAsReadOnly(ctx context.Context, dataStream string, indices []string) error {
// Data stream
path := fmt.Sprintf("/api/upgrade_assistant/migrate_data_stream/%s/readonly", dataStream)
req := upgradeAssistMigrateDSRequest{
Indices: indices,
}
_, err := c.sendRequest(ctx, http.MethodPost, path, req, nil)
return err
}
type SystemIndicesMigrationStatus string
const (
MigrationNeeded SystemIndicesMigrationStatus = "MIGRATION_NEEDED"
NoMigrationNeeded SystemIndicesMigrationStatus = "NO_MIGRATION_NEEDED"
InProgress SystemIndicesMigrationStatus = "IN_PROGRESS"
)
type systemIndicesMigrationResponse struct {
MigrationStatus string `json:"migration_status"`
}
// QuerySystemIndicesMigrationStatus returns the system indices migration status retrieved
// through the Upgrade Assistant API:
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
func (c *Client) QuerySystemIndicesMigrationStatus(ctx context.Context) (SystemIndicesMigrationStatus, error) {
path := "/api/upgrade_assistant/system_indices_migration"
b, err := c.sendRequest(ctx, http.MethodGet, path, nil, nil)
var systemIndicesMigrationResp systemIndicesMigrationResponse
if err = json.Unmarshal(b, &systemIndicesMigrationResp); err != nil {
return "", fmt.Errorf("cannot unmarshal response body: %w", err)
}
status := SystemIndicesMigrationStatus(systemIndicesMigrationResp.MigrationStatus)
return status, nil
}
// migrateSystemIndices migrates the system indices through the Upgrade Assistant API:
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
func (c *Client) migrateSystemIndices(ctx context.Context) error {
path := "/api/upgrade_assistant/system_indices_migration"
_, err := c.sendRequest(ctx, http.MethodPost, path, nil, nil)
return err
}
// migrateSystemIndicesAndWait first checks that the system indices need to be migrated.
// If they do, it will call migrateSystemIndices and wait until the migration is done.
func (c *Client) migrateSystemIndicesAndWait(ctx context.Context) error {
status, err := c.QuerySystemIndicesMigrationStatus(ctx)
if err != nil {
return fmt.Errorf("failed to query system indices migration status: %w", err)
}
// Migration needed, call migrate API.
if status == MigrationNeeded {
if err = c.migrateSystemIndices(ctx); err != nil {
return fmt.Errorf("failed to migrate system indices: %w", err)
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
status, err = c.QuerySystemIndicesMigrationStatus(ctx)
if err != nil {
return fmt.Errorf("failed to query system indices migration status: %w", err)
}
// Migration done.
if status == NoMigrationNeeded {
return nil
}
// Migration in progress, wait for a while.
time.Sleep(5 * time.Second)
}
}
}