pkg/lease/lease.go (196 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package lease import ( "context" "errors" "os" "sync" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/uuid" coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" ) var ( leaseHolder = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "prometheus_engine_lease_is_held", Help: "A boolean metric indicating whether the lease with the given key is currently held.", }, []string{"key"}) leaseFailingOpen = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "prometheus_engine_lease_failing_open", Help: "A boolean metric indicating whether the lease is currently in fail-open state.", }, []string{"key"}) ) // Lease implements a lease on time ranges for different backends. // If the lease backend has intermittent failure, the lease will attempt // to gracefully fail open by extending the lease of the most recent lease holder. // This is done in best-effort manner. type Lease struct { logger log.Logger opts Options lock *wrappedLock elector *leaderelection.LeaderElector onLeaderChange func() } type Options struct { // LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of // last observed ack. // // A client needs to wait a full LeaseDuration without observing a change to // the record before it can attempt to take over. When all clients are // shutdown and a new set of clients are started with different names against // the same leader record, they must wait the full LeaseDuration before // attempting to acquire the lease. Thus LeaseDuration should be as short as // possible (within your tolerance for clock skew rate) to avoid a possible // long waits in the scenario. // // Defaults to 15 seconds. LeaseDuration time.Duration // RenewDeadline is the duration that the acting master will retry // refreshing leadership before giving up. // // Defaults to 10 seconds. RenewDeadline time.Duration // RetryPeriod is the duration the LeaderElector clients should wait // between tries of actions. // // Defaults to 2 seconds. RetryPeriod time.Duration } func NewKubernetes( logger log.Logger, metrics prometheus.Registerer, config *rest.Config, namespace, name string, opts *Options, ) (*Lease, error) { if namespace == "" || name == "" { return nil, errors.New("namespace and name are required for lease") } // Leader id, needs to be unique id, err := os.Hostname() if err != nil { return nil, err } id = id + "_" + string(uuid.NewUUID()) // Construct clients for leader election config = rest.CopyConfig(config) config.ContentType = runtime.ContentTypeProtobuf rest.AddUserAgent(config, "leader-election") corev1Client, err := corev1client.NewForConfig(config) if err != nil { return nil, err } coordinationClient, err := coordinationv1client.NewForConfig(config) if err != nil { return nil, err } lock, err := resourcelock.New( resourcelock.LeasesResourceLock, namespace, name, corev1Client, coordinationClient, resourcelock.ResourceLockConfig{Identity: id}, ) if err != nil { return nil, err } return New(logger, metrics, lock, opts) } func New( logger log.Logger, metrics prometheus.Registerer, lock resourcelock.Interface, opts *Options, ) (*Lease, error) { if logger == nil { logger = log.NewNopLogger() } if opts == nil { opts = &Options{} } if opts.LeaseDuration == 0 { opts.LeaseDuration = 15 * time.Second } if opts.RetryPeriod == 0 { opts.RetryPeriod = 2 * time.Second } if opts.RenewDeadline == 0 { opts.RenewDeadline = 10 * time.Second } if metrics != nil { if err := metrics.Register(leaseHolder); err != nil { return nil, err } if err := metrics.Register(leaseFailingOpen); err != nil { return nil, err } } leaseHolder.WithLabelValues(lock.Describe()).Set(0) leaseFailingOpen.WithLabelValues(lock.Describe()).Set(0) wlock := newWrappedLock(lock) lease := &Lease{ logger: logger, lock: wlock, onLeaderChange: func() {}, opts: *opts, } var err error // We use the Kubernetes client-go leader implementation to drive the lease logic. // The lock itself however may be implemented against any consistent backend. lease.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: wlock, LeaseDuration: opts.LeaseDuration, RetryPeriod: opts.RetryPeriod, RenewDeadline: opts.RenewDeadline, // The purpose of our lease is to determine time ranges for which a leader sends // sample data. We cannot be certain that we never sent data for a later in-range // timestamp already. Thus releasing the lease on cancel would produce possible // overlaps. ReleaseOnCancel: false, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(context.Context) { lease.onLeaderChange() leaseHolder.WithLabelValues(lock.Describe()).Set(1) }, OnStoppedLeading: func() { lease.onLeaderChange() leaseHolder.WithLabelValues(lock.Describe()).Set(0) }, }, }) if err != nil { return nil, err } return lease, nil } func (l *Lease) Range() (start, end time.Time, ok bool) { // If we've previously been the leader but the end timestamp expired, it means we // couldn't successfully communicate with the backend to extend the lease or determine // that someone else got it. // We fail open by pretending that we did extend the lease until we // can either extend/reacquire the lease or observe that someone else acquired it. // // This ensures that transient backend downtimes are generally unnoticeable. It does however // not protect against correlated failures, e.g. if all leaders restart while the // backend is unavailable. This should rarely be an issue. // // Also letting non-leader replicas fail open would handle more cases gracefully. // However, it also has a ramining risk of leaving the replicas jointly in a bad state: // Suppose replica A acquires the lease and writes samples with start timestamp T. // Replica B starts but cannot reach the backend, it fails open despite not being the // leader before and writes with start timestamp T+1. // Now B reaches the lease backend, cannot get the lease and stops sending data. Replica // A will keep sending data as the leader but has an older start timestamp, that causes // write conflicts. It will indefinitely not be able to write cumulative samples. // // We could possibly address this in the future by customizing the lease implementation // to consider each leader candidates' earliest possible start timestamp and force-acquire // the lease if it is more recent than the one of the current leader. // For now our taken approach prevents this, as we do rely on a previously agreed-upon start // timestamp during a failure scenario. // IsLeader checks whether the last observed record matches the own identity. // It does not check timestamps and thus keeps returning true if we were the leader // previously and currently cannot talk to the backend. if !l.elector.IsLeader() { return time.Time{}, time.Time{}, false } start, end = l.lock.lastRange() now := time.Now() if end.Before(now) { leaseFailingOpen.WithLabelValues(l.lock.Describe()).Set(1) end = now.Add(l.opts.LeaseDuration) } else { leaseFailingOpen.WithLabelValues(l.lock.Describe()).Set(0) } return start, end, true } // Run starts trying to acquire and hold the lease until the context is canceled. func (l *Lease) Run(ctx context.Context) { // The elector blocks until it acquired the lease once but exits // when losing it. Thus we need to run it in a loop. for { select { case <-ctx.Done(): return default: l.elector.Run(ctx) } } } // OnLeaderChange sets a callback that's invoked when the leader of the lease changes. func (l *Lease) OnLeaderChange(f func()) { l.onLeaderChange = f } // wrappedLock wraps a LeaseLock implementation and caches the time // range of the last successful update of the lease record. type wrappedLock struct { resourcelock.Interface mtx sync.Mutex start, end time.Time } func newWrappedLock(lock resourcelock.Interface) *wrappedLock { return &wrappedLock{Interface: lock} } // Create attempts to create a leader election record. func (l *wrappedLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { err := l.Interface.Create(ctx, ler) l.update(ler, err) return err } // Update will update an existing leader election record. func (l *wrappedLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { err := l.Interface.Update(ctx, ler) l.update(ler, err) return err } // update the cached state on the create/update result for the record. func (l *wrappedLock) update(ler resourcelock.LeaderElectionRecord, err error) { // If the update was successful, the lease is owned by us and we can update the range. if err != nil { return } l.mtx.Lock() defer l.mtx.Unlock() l.start = ler.AcquireTime.Time l.end = ler.RenewTime.Add(time.Duration(ler.LeaseDurationSeconds) * time.Second) } func (l *wrappedLock) lastRange() (start time.Time, end time.Time) { l.mtx.Lock() defer l.mtx.Unlock() return l.start, l.end }