in cmd/scale.go [256:522]
func (sc *scaleCmd) run(cmd *cobra.Command, args []string) error {
if sc.validateCmd {
if err := sc.validate(cmd); err != nil {
return errors.Wrap(err, "failed to validate scale command")
}
}
if sc.loadAPIModel {
if err := sc.load(); err != nil {
return errors.Wrap(err, "failed to load existing container service")
}
}
if sc.containerService.Properties.IsAzureStackCloud() {
if err := sc.validateOSBaseImage(); err != nil {
return errors.Wrapf(err, "validating OS base images required by %s", sc.apiModelPath)
}
}
ctx, cancel := context.WithTimeout(context.Background(), armhelpers.DefaultARMOperationTimeout)
defer cancel()
orchestratorInfo := sc.containerService.Properties.OrchestratorProfile
var currentNodeCount, highestUsedIndex, index, winPoolIndex int
winPoolIndex = -1
indexes := make([]int, 0)
indexToVM := make(map[int]string)
// Get nodes list from the k8s API before scaling for the desired pool
if sc.apiserverURL != "" {
nodes, err := operations.GetNodes(sc.client, sc.logger, sc.apiserverURL, sc.kubeconfig, time.Duration(5)*time.Minute, sc.agentPoolToScale, -1)
if err == nil && nodes != nil {
sc.nodes = nodes
}
}
if sc.agentPool.IsAvailabilitySets() {
for i := 0; i < 10; i++ {
vmsList, err := sc.client.ListVirtualMachines(ctx, sc.resourceGroupName)
if err != nil {
return errors.Wrap(err, "failed to get VMs in the resource group")
} else if len(vmsList) < 1 {
return errors.New("The provided resource group does not contain any VMs")
}
for _, vm := range vmsList {
vmName := *vm.Name
if !sc.vmInVMASAgentPool(vmName, vm.Tags) {
continue
}
if sc.agentPool.OSType == api.Windows {
_, _, winPoolIndex, index, err = utils.WindowsVMNameParts(vmName)
} else {
_, _, index, err = utils.K8sLinuxVMNameParts(vmName)
}
if err != nil {
return err
}
indexToVM[index] = vmName
indexes = append(indexes, index)
}
// If we get zero VMs that match our api model pool name, then
// Retry every 30 seconds for up to 5 minutes to accommodate temporary issues connecting to the VM API
if len(indexes) > 0 {
break
}
log.Warnf("Found no VMs in resource group %s that match pool name %s\n", sc.resourceGroupName, sc.agentPool.Name)
time.Sleep(30 * time.Second)
}
sortedIndexes := sort.IntSlice(indexes)
sortedIndexes.Sort()
indexes = sortedIndexes
currentNodeCount = len(indexes)
if currentNodeCount == sc.newDesiredAgentCount {
sc.printScaleTargetEqualsExisting(currentNodeCount)
return nil
}
if currentNodeCount > 0 {
highestUsedIndex = indexes[currentNodeCount-1]
} else {
return errors.New("None of the VMs in the provided resource group contain any nodes")
}
// VMAS Scale down Scenario
if currentNodeCount > sc.newDesiredAgentCount {
if sc.apiserverURL == "" {
_ = cmd.Usage()
return errors.New("--apiserver is required to scale down a kubernetes cluster's agent pool")
}
if sc.nodes != nil {
if len(sc.nodes) == 1 {
sc.logger.Infof("There is %d node in pool %s before scaling down to %d:\n", len(sc.nodes), sc.agentPoolToScale, sc.newDesiredAgentCount)
} else {
sc.logger.Infof("There are %d nodes in pool %s before scaling down to %d:\n", len(sc.nodes), sc.agentPoolToScale, sc.newDesiredAgentCount)
}
operations.PrintNodes(sc.nodes)
numNodesFromK8sAPI := len(sc.nodes)
if currentNodeCount != numNodesFromK8sAPI {
sc.logger.Warnf("There are %d VMs named \"*%s*\" in the resource group %s, but there are %d nodes named \"*%s*\" in the Kubernetes cluster\n", currentNodeCount, sc.agentPoolToScale, sc.resourceGroupName, numNodesFromK8sAPI, sc.agentPoolToScale)
} else {
nodesToDelete := currentNodeCount - sc.newDesiredAgentCount
if nodesToDelete > 1 {
sc.logger.Infof("%d nodes will be deleted\n", nodesToDelete)
} else {
sc.logger.Infof("%d node will be deleted\n", nodesToDelete)
}
}
}
vmsToDelete := make([]string, 0)
for i := currentNodeCount - 1; i >= sc.newDesiredAgentCount; i-- {
index = indexes[i]
vmsToDelete = append(vmsToDelete, indexToVM[index])
}
for _, node := range vmsToDelete {
sc.logger.Infof("Node %s will be cordoned and drained\n", node)
}
err := sc.drainNodes(vmsToDelete)
if err != nil {
return errors.Wrap(err, "Got error while draining the nodes to be deleted")
}
for _, node := range vmsToDelete {
sc.logger.Infof("Node %s's VM will be deleted\n", node)
}
errList := operations.ScaleDownVMs(sc.client, sc.logger, sc.SubscriptionID.String(), sc.resourceGroupName, vmsToDelete...)
if errList != nil {
var err error
format := "Node '%s' failed to delete with error: '%s'"
for element := errList.Front(); element != nil; element = element.Next() {
vmError, ok := element.Value.(*operations.VMScalingErrorDetails)
if ok {
if err == nil {
err = errors.Errorf(format, vmError.Name, vmError.Error.Error())
} else {
err = errors.Wrapf(err, format, vmError.Name, vmError.Error.Error())
}
}
}
return err
}
if sc.nodes != nil {
nodes, err := operations.GetNodes(sc.client, sc.logger, sc.apiserverURL, sc.kubeconfig, time.Duration(5)*time.Minute, sc.agentPoolToScale, sc.newDesiredAgentCount)
if err == nil && nodes != nil {
sc.nodes = nodes
sc.logger.Infof("Nodes in pool %s after scaling:\n", sc.agentPoolToScale)
operations.PrintNodes(sc.nodes)
} else {
sc.logger.Warningf("Unable to get nodes in pool %s after scaling:\n", sc.agentPoolToScale)
}
}
if sc.persistAPIModel {
return sc.saveAPIModel()
}
}
}
translator := engine.Context{
Translator: &i18n.Translator{
Locale: sc.locale,
},
}
templateGenerator, err := engine.InitializeTemplateGenerator(translator)
if err != nil {
return errors.Wrap(err, "failed to initialize template generator")
}
// Our templates generate a range of nodes based on a count and offset, it is possible for there to be holes in the template
// So we need to set the count in the template to get enough nodes for the range, if there are holes that number will be larger than the desired count
countForTemplate := sc.newDesiredAgentCount
if highestUsedIndex != 0 {
countForTemplate += highestUsedIndex + 1 - currentNodeCount
}
sc.agentPool.Count = countForTemplate
sc.containerService.Properties.AgentPoolProfiles = []*api.AgentPoolProfile{sc.agentPool}
_, err = sc.containerService.SetPropertiesDefaults(api.PropertiesDefaultsParams{
IsScale: true,
IsUpgrade: false,
PkiKeySize: helpers.DefaultPkiKeySize,
})
if err != nil {
return errors.Wrapf(err, "error in SetPropertiesDefaults template %s", sc.apiModelPath)
}
template, parameters, err := templateGenerator.GenerateTemplateV2(sc.containerService, engine.DefaultGeneratorCode, BuildTag)
if err != nil {
return errors.Wrapf(err, "error generating template %s", sc.apiModelPath)
}
if template, err = transform.PrettyPrintArmTemplate(template); err != nil {
return errors.Wrap(err, "error pretty printing template")
}
templateJSON := make(map[string]interface{})
parametersJSON := make(map[string]interface{})
err = json.Unmarshal([]byte(template), &templateJSON)
if err != nil {
return errors.Wrap(err, "error unmarshaling template")
}
err = json.Unmarshal([]byte(parameters), ¶metersJSON)
if err != nil {
return errors.Wrap(err, "error unmarshaling parameters")
}
transformer := transform.Transformer{Translator: translator.Translator}
addValue(parametersJSON, sc.agentPool.Name+"Count", countForTemplate)
// The agent pool is set to index 0 for the scale operation, we need to overwrite the template variables that rely on pool index.
if winPoolIndex != -1 {
templateJSON["variables"].(map[string]interface{})[sc.agentPool.Name+"Index"] = winPoolIndex
templateJSON["variables"].(map[string]interface{})[sc.agentPool.Name+"VMNamePrefix"] = sc.containerService.Properties.GetAgentVMPrefix(sc.agentPool, winPoolIndex)
}
if orchestratorInfo.KubernetesConfig.LoadBalancerSku == api.StandardLoadBalancerSku {
err = transformer.NormalizeForK8sSLBScalingOrUpgrade(sc.logger, templateJSON)
if err != nil {
return errors.Wrapf(err, "error transforming the template for scaling with SLB %s", sc.apiModelPath)
}
}
err = transformer.NormalizeForK8sVMASScalingUp(sc.logger, templateJSON)
if err != nil {
return errors.Wrapf(err, "error transforming the template for scaling template %s", sc.apiModelPath)
}
transformer.RemoveImmutableResourceProperties(sc.logger, templateJSON)
if sc.agentPool.IsAvailabilitySets() {
addValue(parametersJSON, fmt.Sprintf("%sOffset", sc.agentPool.Name), highestUsedIndex+1)
}
random := rand.New(rand.NewSource(time.Now().UnixNano()))
deploymentSuffix := random.Int31()
if sc.nodes != nil {
sc.logger.Infof("Nodes in pool '%s' before scaling:\n", sc.agentPoolToScale)
operations.PrintNodes(sc.nodes)
}
_, err = sc.client.DeployTemplate(
ctx,
sc.resourceGroupName,
fmt.Sprintf("%s-%d", sc.resourceGroupName, deploymentSuffix),
templateJSON,
parametersJSON)
if err != nil {
return err
}
if sc.nodes != nil {
nodes, err := operations.GetNodes(sc.client, sc.logger, sc.apiserverURL, sc.kubeconfig, time.Duration(5)*time.Minute, sc.agentPoolToScale, sc.newDesiredAgentCount)
if err == nil && nodes != nil {
sc.nodes = nodes
sc.logger.Infof("Nodes in pool '%s' after scaling:\n", sc.agentPoolToScale)
operations.PrintNodes(sc.nodes)
} else {
sc.logger.Warningf("Unable to get nodes in pool %s after scaling:\n", sc.agentPoolToScale)
}
}
if sc.persistAPIModel {
return sc.saveAPIModel()
}
return nil
}