pkg/cloudmap/instances_reconcile_task.go (162 lines of code) (raw):
package cloudmap
import (
"context"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
attrAWSInitHealthStatus = "AWS_INIT_HEALTH_STATUS"
)
// newInstancesReconcileTask constructs new instancesReconcileTask for specific subset of cloudMap service.
func newInstancesReconcileTask(cloudMapSDK services.CloudMap, instancesCache instancesCache, log logr.Logger, done chan struct{}) *instancesReconcileTask {
return &instancesReconcileTask{
cloudMapSDK: cloudMapSDK,
instancesCache: instancesCache,
done: done,
instancesReconcileRequestChan: make(chan instancesReconcileRequest),
instancesWithOngoingOperation: sets.String{},
instanceOperationCompletionChan: make(chan instanceOperationResult),
log: log,
}
}
// instancesReconcileTask representing the work to reconcile instances for specific subset of cloudMap service.
// each instancesReconcileTask should be limited to only work for a single service & subset.
type instancesReconcileTask struct {
cloudMapSDK services.CloudMap
instancesCache instancesCache
done chan struct{}
instancesReconcileRequestChan chan instancesReconcileRequest
// instances that have on-going operation, we'll skip these instances.
instancesWithOngoingOperation sets.String
// chan of instances that completed operation
instanceOperationCompletionChan chan instanceOperationResult
log logr.Logger
}
type instanceOperationResult struct {
instanceID string
err error
}
// run starts the instancesReconcileTask
// It terminates when instances have been successfully reconciled according to desired state.
// i.e. no new desiredState and all async operations are completed.
func (t *instancesReconcileTask) Run(ctx context.Context) {
request := <-t.instancesReconcileRequestChan
for {
err := t.reconcile(ctx, request.service, request.subset, request.readyInstanceInfoByID, request.unreadyInstanceInfoByID)
if err != nil {
request.resultChan <- err
close(request.resultChan)
return
}
if len(t.instancesWithOngoingOperation) == 0 {
close(request.resultChan)
return
}
select {
case <-ctx.Done():
request.resultChan <- ctx.Err()
close(request.resultChan)
return
case newRequest := <-t.instancesReconcileRequestChan:
request.resultChan <- errors.New("cancelled by new desired state")
close(request.resultChan)
request = newRequest
case operationResult := <-t.instanceOperationCompletionChan:
t.instancesWithOngoingOperation.Delete(operationResult.instanceID)
if operationResult.err != nil {
request.resultChan <- operationResult.err
close(request.resultChan)
return
}
}
}
}
// reconcile will try to reconcile subset of cloudMap service into desired state.
// returns ready instances that have been reconciled successfully
func (t *instancesReconcileTask) reconcile(ctx context.Context, service serviceSummary, subset serviceSubset,
desiredReadyInstanceInfoByID map[string]instanceInfo, desiredNotReadyInstanceInfoByID map[string]instanceInfo) error {
existingInstanceAttrsByID, err := t.listServiceSubsetInstances(ctx, service, subset)
if err != nil {
return err
}
instancesToCreateOrUpdate, instancesToDelete := t.matchDesiredInstancesAgainstExistingInstances(desiredReadyInstanceInfoByID, desiredNotReadyInstanceInfoByID, existingInstanceAttrsByID)
t.log.V(1).Info("CloudMap: Register Instances", "InstanceToCreateOrUpdate", instancesToCreateOrUpdate)
for instanceID, info := range instancesToCreateOrUpdate {
if t.instancesWithOngoingOperation.Has(instanceID) {
continue
}
t.instancesWithOngoingOperation.Insert(instanceID)
go func(instanceID string, info instanceInfo) {
err := t.instancesCache.RegisterInstance(ctx, service.serviceID, instanceID, info.attrs)
select {
case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
case <-t.done:
}
}(instanceID, info)
}
t.log.V(1).Info("CloudMap: Deregister Instances", "instancesToDelete", instancesToDelete)
for _, instanceID := range instancesToDelete {
if t.instancesWithOngoingOperation.Has(instanceID) {
continue
}
t.instancesWithOngoingOperation.Insert(instanceID)
go func(instanceID string) {
err := t.instancesCache.DeregisterInstance(ctx, service.serviceID, instanceID)
select {
case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
case <-t.done:
}
}(instanceID)
}
return nil
}
func (t *instancesReconcileTask) matchDesiredInstancesAgainstExistingInstances(
desiredReadyInstanceInfoByID map[string]instanceInfo,
desiredNotReadyInstanceInfoByID map[string]instanceInfo,
existingInstanceAttrsByID map[string]instanceAttributes) (map[string]instanceInfo, []string) {
instancesToCreateOrUpdate := make(map[string]instanceInfo)
for instanceID, desiredInfo := range desiredReadyInstanceInfoByID {
if existingAttrs, exists := existingInstanceAttrsByID[instanceID]; exists {
if !cmp.Equal(desiredInfo.attrs, existingAttrs, ignoreAttrAWSInitHealthStatus()) {
if existingInitHealthStatus, ok := existingAttrs[attrAWSInitHealthStatus]; ok {
desiredInfo.attrs[attrAWSInitHealthStatus] = existingInitHealthStatus
} else {
desiredInfo.attrs[attrAWSInitHealthStatus] = servicediscovery.CustomHealthStatusHealthy
}
instancesToCreateOrUpdate[instanceID] = desiredInfo
}
} else {
desiredInfo.attrs[attrAWSInitHealthStatus] = servicediscovery.CustomHealthStatusHealthy
instancesToCreateOrUpdate[instanceID] = desiredInfo
}
}
for instanceID, desiredInfo := range desiredNotReadyInstanceInfoByID {
if existingAttrs, exists := existingInstanceAttrsByID[instanceID]; exists {
if !cmp.Equal(desiredInfo.attrs, existingAttrs, ignoreAttrAWSInitHealthStatus()) {
if existingInitHealthStatus, ok := existingAttrs[attrAWSInitHealthStatus]; ok {
desiredInfo.attrs[attrAWSInitHealthStatus] = existingInitHealthStatus
} else {
desiredInfo.attrs[attrAWSInitHealthStatus] = servicediscovery.CustomHealthStatusUnhealthy
}
instancesToCreateOrUpdate[instanceID] = desiredInfo
}
}
}
desiredInstanceIDs := sets.StringKeySet(desiredReadyInstanceInfoByID).Union(sets.StringKeySet(desiredNotReadyInstanceInfoByID))
existingInstanceIDs := sets.StringKeySet(existingInstanceAttrsByID)
instancesToDelete := existingInstanceIDs.Difference(desiredInstanceIDs).List()
return instancesToCreateOrUpdate, instancesToDelete
}
// listServiceSubsetInstances returns instances that should belong to subset of cloudMap service.
func (t *instancesReconcileTask) listServiceSubsetInstances(ctx context.Context, service serviceSummary, subset serviceSubset) (map[string]instanceAttributes, error) {
instanceAttrsByID, err := t.instancesCache.ListInstances(ctx, service.serviceID)
if err != nil {
return nil, err
}
subsetInstanceAttrsByID := make(map[string]instanceAttributes)
for instanceID, attrs := range instanceAttrsByID {
if subset.Contains(instanceID, attrs) {
subsetInstanceAttrsByID[instanceID] = attrs
}
}
return subsetInstanceAttrsByID, nil
}
func ignoreAttrAWSInitHealthStatus() cmp.Option {
return cmpopts.IgnoreMapEntries(func(key string, _ string) bool {
return key == attrAWSInitHealthStatus
})
}