pkg/cloudmap/instances_health_prober.go (241 lines of code) (raw):
package cloudmap
import (
"context"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/k8s"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)
const (
defaultHealthProbePeriod = 4 * time.Second
defaultHealthTransitionDuration = 20 * time.Second
)
// instancesHealthProber will probe health status
type instancesHealthProber interface {
// Submit will submit probe task for serviceID and instances.
Submit(ctx context.Context, service serviceSummary, subset serviceSubset, instanceInfoByID map[string]instanceInfo, timeout time.Duration) error
}
// newDefaultInstancesHealthProber constructs new instancesHealthProber
func newDefaultInstancesHealthProber(ctx context.Context, k8sClient client.Client, cloudMapSDK services.CloudMap, log logr.Logger) *defaultInstancesHealthProber {
prober := &defaultInstancesHealthProber{
k8sClient: k8sClient,
cloudMapSDK: cloudMapSDK,
probeRequestChan: make(chan probeRequest),
probePeriod: defaultHealthProbePeriod,
transitionDuration: defaultHealthTransitionDuration,
log: log,
}
go prober.probeLoop(ctx)
return prober
}
var _ instancesHealthProber = &defaultInstancesHealthProber{}
type defaultInstancesHealthProber struct {
k8sClient client.Client
cloudMapSDK services.CloudMap
probeRequestChan chan probeRequest
// how frequently to probe instance health
probePeriod time.Duration
// how long an instance should stay in specific healthyStatus before we update pod's condition.
transitionDuration time.Duration
log logr.Logger
}
type instanceHealthyStatusProbeFunc func(ctx context.Context, serviceID string, instanceIDs []string) (map[string]bool, error)
type probeRequest struct {
serviceSubsetID serviceSubsetID
probeFunc instanceHealthyStatusProbeFunc
instanceInfoByID map[string]instanceInfo
timeout time.Duration
}
type probeConfig struct {
probeFunc instanceHealthyStatusProbeFunc
probeEntries []instanceProbeEntry
timeoutTime time.Time
}
type instanceProbeEntry struct {
instanceID string
instanceInfo instanceInfo
// last time this pod have been changing healthyStatus
lastTransitionTime time.Time
// last HealthyStatus probed.
lastHealthyStatus bool
}
func (p *defaultInstancesHealthProber) Submit(ctx context.Context, service serviceSummary, subset serviceSubset, instanceInfoByID map[string]instanceInfo, timeout time.Duration) error {
serviceSubsetID := serviceSubsetID{serviceID: service.serviceID, subsetID: subset.SubsetID()}
probeFunc := p.probeInstanceHealthyStatusWithoutCustomHC
if service.healthCheckCustomConfig != nil {
probeFunc = p.probeInstanceHealthyStatusWithCustomHC
}
instanceInfoByID, err := p.filterInstancesBlockedByCMHealthyReadinessGate(ctx, instanceInfoByID)
if err != nil {
return err
}
probeRequest := probeRequest{
serviceSubsetID: serviceSubsetID,
probeFunc: probeFunc,
instanceInfoByID: instanceInfoByID,
timeout: timeout,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.probeRequestChan <- probeRequest:
return nil
}
}
func (p *defaultInstancesHealthProber) probeLoop(ctx context.Context) {
probeConfigByServiceSubset := make(map[serviceSubsetID]probeConfig)
for {
var timer <-chan time.Time
if len(probeConfigByServiceSubset) > 0 {
timer = time.After(p.probePeriod)
}
select {
case <-ctx.Done():
return
case probeRequest := <-p.probeRequestChan:
if len(probeRequest.instanceInfoByID) == 0 {
delete(probeConfigByServiceSubset, probeRequest.serviceSubsetID)
} else {
probeEntries := make([]instanceProbeEntry, 0, len(probeRequest.instanceInfoByID))
for instanceID, instanceInfo := range probeRequest.instanceInfoByID {
probeEntries = append(probeEntries, instanceProbeEntry{
instanceID: instanceID,
instanceInfo: instanceInfo,
lastTransitionTime: time.Time{},
lastHealthyStatus: false,
})
}
probeConfigByServiceSubset[probeRequest.serviceSubsetID] = probeConfig{
probeFunc: probeRequest.probeFunc,
probeEntries: probeEntries,
timeoutTime: time.Now().Add(probeRequest.timeout),
}
}
case <-timer:
for serviceSubsetID, instancesProbeConfig := range probeConfigByServiceSubset {
if time.Now().After(instancesProbeConfig.timeoutTime) {
p.log.Error(errors.New("timeout probe instances"),
"serviceID", serviceSubsetID.serviceID,
"subsetID", serviceSubsetID.subsetID)
delete(probeConfigByServiceSubset, serviceSubsetID)
continue
}
probeEntriesToContinue, err := p.probeInstances(ctx, serviceSubsetID.serviceID, instancesProbeConfig.probeFunc, instancesProbeConfig.probeEntries)
if err != nil {
p.log.Error(err, "failed to probe instances",
"serviceID", serviceSubsetID.serviceID,
"subsetID", serviceSubsetID.subsetID)
} else if len(probeEntriesToContinue) == 0 {
delete(probeConfigByServiceSubset, serviceSubsetID)
} else {
probeConfigByServiceSubset[serviceSubsetID] = probeConfig{
probeFunc: instancesProbeConfig.probeFunc,
probeEntries: probeEntriesToContinue,
timeoutTime: instancesProbeConfig.timeoutTime,
}
}
}
}
}
}
func (p *defaultInstancesHealthProber) probeInstances(ctx context.Context, serviceID string, probeFunc instanceHealthyStatusProbeFunc, probeEntries []instanceProbeEntry) ([]instanceProbeEntry, error) {
var instanceIDs []string
for _, probeEntry := range probeEntries {
instanceIDs = append(instanceIDs, probeEntry.instanceID)
}
healthyStatusByInstanceID, err := probeFunc(ctx, serviceID, instanceIDs)
if err != nil {
return nil, err
}
p.log.V(1).Info("probed instance healthStatus",
"healthyStatusByInstanceID", healthyStatusByInstanceID)
var probeEntriesToContinue []instanceProbeEntry
for _, probeEntry := range probeEntries {
healthStatus := healthyStatusByInstanceID[probeEntry.instanceID]
shouldContinue, err := p.updateInstanceProbeEntry(ctx, &probeEntry, healthStatus)
if err != nil {
p.log.Error(err, "failed to update pod HealthyCondition",
"pod", k8s.NamespacedName(probeEntry.instanceInfo.pod))
probeEntriesToContinue = append(probeEntriesToContinue, probeEntry)
} else if shouldContinue {
probeEntriesToContinue = append(probeEntriesToContinue, probeEntry)
}
}
return probeEntriesToContinue, nil
}
func (p *defaultInstancesHealthProber) updateInstanceProbeEntry(ctx context.Context, probeEntry *instanceProbeEntry, healthyStatus bool) (bool, error) {
if probeEntry.lastTransitionTime.IsZero() || probeEntry.lastHealthyStatus != healthyStatus {
probeEntry.lastTransitionTime = time.Now()
probeEntry.lastHealthyStatus = healthyStatus
}
if time.Since(probeEntry.lastTransitionTime) < p.transitionDuration {
return true, nil
}
podHealthyConditionStatus := corev1.ConditionFalse
if healthyStatus {
podHealthyConditionStatus = corev1.ConditionTrue
}
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := probeEntry.instanceInfo.pod.DeepCopy()
if err := p.k8sClient.Get(ctx, k8s.NamespacedName(pod), pod); err != nil {
return err
}
oldPod := pod.DeepCopy()
if updated := k8s.UpdatePodCondition(pod, k8s.ConditionAWSCloudMapHealthy, podHealthyConditionStatus, nil, nil); updated {
if err := p.k8sClient.Status().Patch(ctx, pod, client.MergeFrom(oldPod)); err != nil {
return err
}
}
probeEntry.instanceInfo.pod = pod
return nil
}); err != nil {
return false, err
}
return podHealthyConditionStatus != corev1.ConditionTrue, nil
}
func (p *defaultInstancesHealthProber) probeInstanceHealthyStatusWithoutCustomHC(ctx context.Context, serviceID string, instanceIDs []string) (map[string]bool, error) {
healthyStatusByInstanceID := make(map[string]bool, len(instanceIDs))
for _, instanceID := range instanceIDs {
_, err := p.cloudMapSDK.GetInstanceWithContext(ctx, &servicediscovery.GetInstanceInput{
ServiceId: aws.String(serviceID),
InstanceId: aws.String(instanceID),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == servicediscovery.ErrCodeInstanceNotFound {
healthyStatusByInstanceID[instanceID] = false
continue
}
return nil, err
}
healthyStatusByInstanceID[instanceID] = true
}
return healthyStatusByInstanceID, nil
}
func (p *defaultInstancesHealthProber) probeInstanceHealthyStatusWithCustomHC(ctx context.Context, serviceID string, instanceIDs []string) (map[string]bool, error) {
healthyStatusByInstanceID := make(map[string]bool, len(instanceIDs))
if err := p.cloudMapSDK.GetInstancesHealthStatusPagesWithContext(ctx, &servicediscovery.GetInstancesHealthStatusInput{
ServiceId: aws.String(serviceID),
Instances: aws.StringSlice(instanceIDs),
}, func(output *servicediscovery.GetInstancesHealthStatusOutput, b bool) bool {
for instanceID, status := range output.Status {
healthyStatusByInstanceID[instanceID] = aws.StringValue(status) == servicediscovery.HealthStatusHealthy
}
return true
}); err != nil {
return nil, err
}
return healthyStatusByInstanceID, nil
}
// filterInstancesBlockedByCMHealthyReadinessGate returns unhealthy ones that needs are blocked by ConditionAWSCloudMapHealthy readinessGate.
func (p *defaultInstancesHealthProber) filterInstancesBlockedByCMHealthyReadinessGate(ctx context.Context, instanceInfoByID map[string]instanceInfo) (map[string]instanceInfo, error) {
blockedInstanceByID := make(map[string]instanceInfo, len(instanceInfoByID))
for instanceID, instanceInfo := range instanceInfoByID {
if err := p.k8sClient.Get(ctx, k8s.NamespacedName(instanceInfo.pod), instanceInfo.pod); err != nil {
return nil, err
}
podHealthyCondition := k8s.GetPodCondition(instanceInfo.pod, k8s.ConditionAWSCloudMapHealthy)
if podHealthyCondition == nil || podHealthyCondition.Status != corev1.ConditionTrue {
blockedInstanceByID[instanceID] = instanceInfo
}
}
return blockedInstanceByID, nil
}