in pkg/cloudmap/instances_reconcile_task.go [91:133]
func (t *instancesReconcileTask) reconcile(ctx context.Context, service serviceSummary, subset serviceSubset,
desiredReadyInstanceInfoByID map[string]instanceInfo, desiredNotReadyInstanceInfoByID map[string]instanceInfo) error {
existingInstanceAttrsByID, err := t.listServiceSubsetInstances(ctx, service, subset)
if err != nil {
return err
}
instancesToCreateOrUpdate, instancesToDelete := t.matchDesiredInstancesAgainstExistingInstances(desiredReadyInstanceInfoByID, desiredNotReadyInstanceInfoByID, existingInstanceAttrsByID)
t.log.V(1).Info("CloudMap: Register Instances", "InstanceToCreateOrUpdate", instancesToCreateOrUpdate)
for instanceID, info := range instancesToCreateOrUpdate {
if t.instancesWithOngoingOperation.Has(instanceID) {
continue
}
t.instancesWithOngoingOperation.Insert(instanceID)
go func(instanceID string, info instanceInfo) {
err := t.instancesCache.RegisterInstance(ctx, service.serviceID, instanceID, info.attrs)
select {
case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
case <-t.done:
}
}(instanceID, info)
}
t.log.V(1).Info("CloudMap: Deregister Instances", "instancesToDelete", instancesToDelete)
for _, instanceID := range instancesToDelete {
if t.instancesWithOngoingOperation.Has(instanceID) {
continue
}
t.instancesWithOngoingOperation.Insert(instanceID)
go func(instanceID string) {
err := t.instancesCache.DeregisterInstance(ctx, service.serviceID, instanceID)
select {
case t.instanceOperationCompletionChan <- instanceOperationResult{instanceID: instanceID, err: err}:
case <-t.done:
}
}(instanceID)
}
return nil
}