func()

in src/go/cmd/vmscaler/vmscaler/vmscaler.go [199:282]


func (v *VMScaler) deleteNodes() error {
	log.Debug.Printf("[VMScaler.deleteNodes")
	defer log.Debug.Printf("VMScaler.deleteNodes]")

	vmssInstances := make(map[string][]*QueueMessage)
	instanceCount := 0

	// dequeue all items
	for {
		queueMessage, err := v.deleteQueueClient.Dequeue(QueueMessageCount, visibilityTimeout)
		if err != nil {
			log.Error.Printf("error dequeueing message %v", err)
			break
		}
		if queueMessage.NumMessages() == QueueMessageCount {
			msg := queueMessage.Message(0)
			vmss, instance, err := extractVMSSInstanceCsvMessage(msg.Text)
			if err != nil {
				log.Error.Printf("error dequeueing message %v", err)
				if _, err := v.deleteQueueClient.DeleteMessage(msg.ID, msg.PopReceipt); err != nil {
					log.Error.Printf("error deleting queue message '%s': %v", msg.ID, err)
				}
				continue
			}
			qm := &QueueMessage{
				Message:  queueMessage,
				Instance: instance,
			}
			if _, ok := vmssInstances[vmss]; !ok {
				vmssInstances[vmss] = []*QueueMessage{qm}
			} else {
				vmssInstances[vmss] = append(vmssInstances[vmss], qm)
			}
			instanceCount++
		} else {
			// there are no more messages to process
			break
		}
	}

	if instanceCount == 0 {
		// no work to do, return
		return nil
	}

	// update the capacity tag, but don't go lower than 0
	if v.TotalNodes > 0 {
		nodeCount := v.TotalNodes - int64(instanceCount)
		if nodeCount > 0 {
			v.TotalNodes = nodeCount
		} else {
			v.TotalNodes = 0
		}
		if e := v.setTotalNodes(); e != nil {
			log.Error.Printf("error setting nodes to %v: %v", v.TotalNodes, e)
		}
	}

	// delete the instances
	for k, vi := range vmssInstances {
		instances := []string{}
		for _, i := range vi {
			instances = append(instances, i.Instance)
		}
		var ids compute.VirtualMachineScaleSetVMInstanceRequiredIDs
		ids.InstanceIds = &instances
		future, err := v.vmssClient.VmssClient.DeleteInstances(v.Context, v.ResourceGroup, k, ids)
		if err != nil {
			log.Error.Printf("error deleting instances for '%s': %v", k, instances)
			continue
		}
		v.vmssOpManager.AddWatchOperation(k, future.FutureAPI)
		// delete the queue messages
		for _, i := range vi {
			msg := i.Message.Message(0)
			if _, err := v.deleteQueueClient.DeleteMessage(msg.ID, msg.PopReceipt); err != nil {
				log.Error.Printf("error deleting queue message '%s': %v", msg.ID, err)
			}
		}
	}

	// dequeue until there are no more items, and delete each item.
	return nil
}