pkg/plan/track_change.go (111 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package plan import ( "fmt" "time" "github.com/elastic/cloud-sdk-go/pkg/api/apierror" "github.com/elastic/cloud-sdk-go/pkg/client/deployments" "github.com/elastic/cloud-sdk-go/pkg/util/ec" "github.com/elastic/cloud-sdk-go/pkg/util/slice" ) // TrackChange iterates over a deployment's resources pending plans, sending // updates to the returned channel in the form of TrackResponse every frequency // period configured in the parameter's TrackFrequencyConfig. // When all of deployment's resources pending plans have finished, the channel // is automatically closed by the goroutine that this function launches. It is // possible to iterate with a for loop and assume that the loop will exit after // all of the updates have been sent and the channel has been closed. // If a ResourceID and Kind are set instead of the DeploymentID, a reverse // lookup will be performed in order to find the DeploymentID and be able to // track the pending plan. func TrackChange(params TrackChangeParams) (<-chan TrackResponse, error) { params.Config.fillDefaults() if err := params.Validate(); err != nil { return nil, err } deploymentID, err := getDeploymentID(params) if err != nil { return nil, err } params.DeploymentID = deploymentID var out = make(chan TrackResponse) go trackChange(params, out, time.NewTicker(params.Config.PollFrequency)) return out, nil } func trackChange(params TrackChangeParams, c chan<- TrackResponse, ticker *time.Ticker) { // Close the channel before the function returns. This particularly // important so that clients consuming this channel can use it in // a for loop and assume that when the foor loop ends, the change is // complete. defer close(c) // retries is used as a simple counter which is incremented every time an // error occurs, or when the returned pending plan slice is 0. var retries int // changedResources is a list of resource IDs which have been seen to have // a pending plan. It's used to filter out any resources which weren't // part of the last plan change. var changedResources []string for range ticker.C { // After the retries number is higher or equal to MaxRetries, the plan // changed is considered complete. In which case, the current plan or // the last plan in the plan history is checked to obtain the last plan // step log, and decode any errors which might've happened or mark the // plan as succeeded. if retries >= params.Config.MaxRetries { var checkRetries int checkCurrentStatus(params, c, changedResources, checkRetries) return } res, err := params.V1API.Deployments.GetDeployment( deployments.NewGetDeploymentParams(). WithDeploymentID(params.DeploymentID). WithShowPlanLogs(ec.Bool(true)). WithShowPlans(ec.Bool(true)), params.AuthWriter, ) if err != nil { retries++ continue } var plans = buildTrackResponse(res.Payload.Resources, false) if len(plans) == 0 { retries++ } for _, p := range plans { changedResources = append(changedResources, p.ID) p.DeploymentID = *res.Payload.ID ignoreChange := params.ResourceID != p.ID && params.IgnoreDownstream if ignoreChange { continue } c <- p } } } // getDeploymentID ensures that a DeploymentID is found, if the DeploymentID // has already been set in the parameters, it simply returns that ID, otherwise // performs a deployment search to obtain the Deployment ID from a resource ID // and Kind. func getDeploymentID(params TrackChangeParams) (string, error) { if params.DeploymentID != "" { return params.DeploymentID, nil } res, err := params.V1API.Deployments.SearchDeployments( deployments.NewSearchDeploymentsParams(). WithBody(LookupByResourceIdQuery(params.ResourceID)). WithContext(params.Context), params.AuthWriter, ) if err != nil { return "", apierror.Wrap(err) } if len(res.Payload.Deployments) > 0 { return *res.Payload.Deployments[0].ID, nil } return "", fmt.Errorf( "plan track change: couldn't find a deployment containing resource with ID %s", params.ResourceID, ) } // checkCurrentStatus is run after the deployment's resources pending plans // have finished. It's necessary for a couple of reasons: // 1. Catching any errors which might've happen in the pending plan but // weren't caught because the plan finished in between polling periods. // 2. Posting the end result of the resource back to the channel. // // Additionally, changedResources is sent as a parameter to filter out any of // the deployment's resources which weren't involved in the plan change. func checkCurrentStatus(params TrackChangeParams, c chan<- TrackResponse, changedResources []string, retries int) { res, err := params.V1API.Deployments.GetDeployment( deployments.NewGetDeploymentParams(). WithDeploymentID(params.DeploymentID). WithShowPlanLogs(ec.Bool(true)). WithShowPlans(ec.Bool(true)). // Necessary for deployments which failed on creation WithShowPlanHistory(ec.Bool(true)), params.AuthWriter, ) if err != nil { // retry the API call again until params.Config.MaxRetries is reached. if retries < params.Config.MaxRetries { retries++ checkCurrentStatus(params, c, changedResources, retries) } return } for _, trackResponse := range buildTrackResponse(res.Payload.Resources, true) { trackResponse.DeploymentID = *res.Payload.ID ignoreChange := params.ResourceID != trackResponse.ID && params.IgnoreDownstream // This conditional catches plans that failed but finished before the // plan tracker had the chance to call the API, changedResources will be // 0 length. // Since we'd still like to report on any failures that that plan might // have had, we're effectively sending a message to the plan tracker // when the current plan ended with an error. if len(changedResources) == 0 && trackResponse.Err != nil { if !ignoreChange { c <- trackResponse } continue } if slice.HasString(changedResources, trackResponse.ID) { if !ignoreChange { c <- trackResponse } } } }