func()

in pkg/cloudmap/instances_reconcile_task.go [91:133]


func (t *instancesReconcileTask) reconcile(ctx context.Context, service serviceSummary, subset serviceSubset,
	desiredReadyInstanceInfoByID map[string]instanceInfo, desiredNotReadyInstanceInfoByID map[string]instanceInfo) error {

	existingInstanceAttrsByID, err := t.listServiceSubsetInstances(ctx, service, subset)
	if err != nil {
		return err
	}

	instancesToCreateOrUpdate, instancesToDelete := t.matchDesiredInstancesAgainstExistingInstances(desiredReadyInstanceInfoByID, desiredNotReadyInstanceInfoByID, existingInstanceAttrsByID)

	t.log.V(1).Info("CloudMap: Register Instances", "InstanceToCreateOrUpdate", instancesToCreateOrUpdate)

	for instanceID, info := range instancesToCreateOrUpdate {
		if t.instancesWithOngoingOperation.Has(instanceID) {
			continue
		}
		t.instancesWithOngoingOperation.Insert(instanceID)
		go func(instanceID string, info instanceInfo) {
			err := t.instancesCache.RegisterInstance(ctx, service.serviceID, instanceID, info.attrs)
			select {
			case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
			case <-t.done:
			}
		}(instanceID, info)
	}

	t.log.V(1).Info("CloudMap: Deregister Instances", "instancesToDelete", instancesToDelete)

	for _, instanceID := range instancesToDelete {
		if t.instancesWithOngoingOperation.Has(instanceID) {
			continue
		}
		t.instancesWithOngoingOperation.Insert(instanceID)
		go func(instanceID string) {
			err := t.instancesCache.DeregisterInstance(ctx, service.serviceID, instanceID)
			select {
			case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
			case <-t.done:
			}
		}(instanceID)
	}
	return nil
}