internal/alloydb/instance.go (263 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alloydb
import (
"context"
"crypto/rsa"
"fmt"
"regexp"
"sync"
"time"
alloydbadmin "cloud.google.com/go/alloydb/apiv1alpha"
"cloud.google.com/go/alloydbconn/debug"
"cloud.google.com/go/alloydbconn/errtype"
telv2 "cloud.google.com/go/alloydbconn/internal/tel/v2"
"golang.org/x/time/rate"
)
const (
// the refresh buffer is the amount of time before a refresh cycle's result
// expires that a new refresh operation begins.
refreshBuffer = 4 * time.Minute
// refreshInterval is the amount of time between refresh attempts as
// enforced by the rate limiter.
refreshInterval = 30 * time.Second
// RefreshTimeout is the maximum amount of time to wait for a refresh
// cycle to complete. This value should be greater than the
// refreshInterval.
RefreshTimeout = 60 * time.Second
// refreshBurst is the initial burst allowed by the rate limiter.
refreshBurst = 2
)
var (
// Instance URI is in the format:
// 'projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>'
// Additionally, we have to support legacy "domain-scoped" projects
// (e.g. "google.com:PROJECT")
instURIRegex = regexp.MustCompile("projects/([^:]+(:[^:]+)?)/locations/([^:]+)/clusters/([^:]+)/instances/([^:]+)")
)
// InstanceURI represents an AlloyDB instance.
type InstanceURI struct {
project string
region string
cluster string
name string
}
// Project returns the project ID of the cluster.
func (i InstanceURI) Project() string {
return i.project
}
// Region returns the region (aka location) of the cluster.
func (i InstanceURI) Region() string {
return i.region
}
// Cluster returns the name of the cluster.
func (i InstanceURI) Cluster() string {
return i.cluster
}
// Name returns the name of the instance.
func (i InstanceURI) Name() string {
return i.name
}
// URI returns the full URI specifying an instance.
func (i *InstanceURI) URI() string {
return fmt.Sprintf(
"projects/%s/locations/%s/clusters/%s/instances/%s",
i.project, i.region, i.cluster, i.name,
)
}
// String returns a short-hand representation of an instance URI.
func (i *InstanceURI) String() string {
return fmt.Sprintf("%s/%s/%s/%s", i.project, i.region, i.cluster, i.name)
}
// ParseInstURI initializes a new InstanceURI struct.
func ParseInstURI(cn string) (InstanceURI, error) {
b := []byte(cn)
m := instURIRegex.FindSubmatch(b)
if m == nil {
err := errtype.NewConfigError(
"invalid instance URI, expected projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>",
cn,
)
return InstanceURI{}, err
}
c := InstanceURI{
project: string(m[1]),
region: string(m[3]),
cluster: string(m[4]),
name: string(m[5]),
}
return c, nil
}
// refreshOperation is a pending result of a refresh operation of data used to
// connect securely. It should only be initialized by the Instance struct as
// part of a refresh cycle.
type refreshOperation struct {
result ConnectionInfo
err error
// timer that triggers refresh, can be used to cancel.
timer *time.Timer
// indicates the struct is ready to read from
ready chan struct{}
}
// Cancel prevents the instanceInfo from starting, if it hasn't already
// started. Returns true if timer was stopped successfully, or false if it has
// already started.
func (r *refreshOperation) cancel() bool {
return r.timer.Stop()
}
// IsValid returns true if this result is complete, successful, and is still
// valid.
func (r *refreshOperation) isValid() bool {
// verify the result has finished running
select {
default:
return false
case <-r.ready:
if r.err != nil || time.Now().After(r.result.Expiration) {
return false
}
return true
}
}
// RefreshAheadCache manages the information used to connect to the AlloyDB instance by
// periodically calling the AlloyDB Admin API. It automatically refreshes the
// required information approximately 4 minutes before the previous certificate
// expires (every ~56 minutes).
type RefreshAheadCache struct {
instanceURI InstanceURI
logger debug.ContextLogger
// refreshTimeout sets the maximum duration a refresh cycle can run
// for.
refreshTimeout time.Duration
// l controls the rate at which refresh cycles are run.
l *rate.Limiter
r adminAPIClient
resultGuard sync.RWMutex
// cur represents the current refreshOperation that will be used to
// create connections. If a valid complete refreshOperation isn't
// available it's possible for cur to be equal to next.
cur *refreshOperation
// next represents a future or ongoing refreshOperation. Once complete,
// it will replace cur and schedule a replacement to occur.
next *refreshOperation
// ctx is the default ctx for refresh operations. Canceling it prevents
// new refresh operations from being triggered.
ctx context.Context
cancel context.CancelFunc
userAgent string
metricRecorder telv2.MetricRecorder
}
// NewRefreshAheadCache initializes a new cache that proactively refreshes the
// caches connection info.
func NewRefreshAheadCache(
instance InstanceURI,
l debug.ContextLogger,
client *alloydbadmin.AlloyDBAdminClient,
key *rsa.PrivateKey,
refreshTimeout time.Duration,
dialerID string,
disableMetadataExchange bool,
userAgent string,
mr telv2.MetricRecorder,
) *RefreshAheadCache {
ctx, cancel := context.WithCancel(context.Background())
i := &RefreshAheadCache{
instanceURI: instance,
logger: l,
l: rate.NewLimiter(rate.Every(refreshInterval), refreshBurst),
r: newAdminAPIClient(client, key, dialerID, disableMetadataExchange),
refreshTimeout: refreshTimeout,
ctx: ctx,
cancel: cancel,
userAgent: userAgent,
metricRecorder: mr,
}
// For the initial refresh operation, set cur = next so that connection
// requests block until the first refresh is complete.
i.resultGuard.Lock()
i.cur = i.scheduleRefresh(0)
i.next = i.cur
i.resultGuard.Unlock()
return i
}
// Close closes the instance; it stops the refresh cycle and prevents it from
// making additional calls to the AlloyDB Admin API.
func (i *RefreshAheadCache) Close() error {
i.resultGuard.Lock()
defer i.resultGuard.Unlock()
i.cancel()
i.cur.cancel()
i.next.cancel()
return nil
}
// ConnectionInfo returns an IP address specified by ipType (i.e., public or
// private) of the AlloyDB instance.
func (i *RefreshAheadCache) ConnectionInfo(ctx context.Context) (ConnectionInfo, error) {
i.resultGuard.RLock()
refresh := i.cur
i.resultGuard.RUnlock()
var err error
select {
case <-refresh.ready:
err = refresh.err
case <-ctx.Done():
err = ctx.Err()
case <-i.ctx.Done():
err = i.ctx.Err()
}
if err != nil {
return ConnectionInfo{}, err
}
return refresh.result, nil
}
// ForceRefresh triggers an immediate refresh operation to be scheduled and
// used for future connection attempts if valid.
func (i *RefreshAheadCache) ForceRefresh() {
i.resultGuard.Lock()
defer i.resultGuard.Unlock()
// If the next refresh hasn't started yet, we can cancel it and start an immediate one
if i.next.cancel() {
i.next = i.scheduleRefresh(0)
}
// block all sequential connection attempts on the next refresh operation
// if current is invalid
if !i.cur.isValid() {
i.cur = i.next
}
}
// refreshDuration returns the duration to wait before starting the next
// refresh. Usually that duration will be half of the time until certificate
// expiration.
func refreshDuration(now, certExpiry time.Time) time.Duration {
d := certExpiry.Sub(now)
if d < time.Hour {
// Something is wrong with the certification, refresh now.
if d < refreshBuffer {
return 0
}
// Otherwise wait until 4 minutes before expiration for next refresh cycle.
return d - refreshBuffer
}
return d / 2
}
// scheduleRefresh schedules a refresh operation to be triggered after a given
// duration. The returned refreshOperation can be used to either Cancel or Wait
// for the operation's result.
func (i *RefreshAheadCache) scheduleRefresh(d time.Duration) *refreshOperation {
r := &refreshOperation{}
r.ready = make(chan struct{})
r.timer = time.AfterFunc(d, func() {
// instance has been closed, don't schedule anything
if err := i.ctx.Err(); err != nil {
i.logger.Debugf(
context.Background(),
"[%v] Instance is closed, stopping refresh operations",
i.instanceURI.String(),
)
r.err = err
close(r.ready)
return
}
i.logger.Debugf(
context.Background(),
"[%v] Connection info refresh operation started",
i.instanceURI.String(),
)
ctx, cancel := context.WithTimeout(i.ctx, i.refreshTimeout)
defer cancel()
err := i.l.Wait(ctx)
if err != nil {
r.err = errtype.NewDialError(
"context was canceled or expired before refresh completed",
i.instanceURI.String(),
nil,
)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation failed, err = %v",
i.instanceURI.String(),
r.err,
)
} else {
r.result, r.err = i.r.connectionInfo(i.ctx, i.instanceURI)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation complete",
i.instanceURI.String(),
)
i.logger.Debugf(
ctx,
"[%v] Current certificate expiration = %v",
i.instanceURI.String(),
r.result.Expiration.UTC().Format(time.RFC3339),
)
}
close(r.ready)
// Once the refresh is complete, update "current" with working
// result and schedule a new refresh
i.resultGuard.Lock()
defer i.resultGuard.Unlock()
// if failed, scheduled the next refresh immediately
if r.err != nil {
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation scheduled immediately",
i.instanceURI.String(),
)
i.next = i.scheduleRefresh(0)
// If the latest result is bad, avoid replacing the
// used result while it's still valid and potentially
// able to provide successful connections. TODO: This
// means that errors while the current result is still
// valid are suppressed. We should try to surface
// errors in a more meaningful way.
if !i.cur.isValid() {
i.cur = r
}
go i.metricRecorder.RecordRefreshCount(context.Background(), telv2.Attributes{
UserAgent: i.userAgent,
RefreshType: telv2.RefreshAheadType,
RefreshStatus: telv2.RefreshFailure,
})
return
}
// Update the current results, and schedule the next refresh in
// the future
i.cur = r
t := refreshDuration(time.Now(), i.cur.result.Expiration)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation scheduled at %v (now + %v)",
i.instanceURI.String(),
time.Now().Add(t).UTC().Format(time.RFC3339),
t.Round(time.Minute),
)
i.next = i.scheduleRefresh(t)
go i.metricRecorder.RecordRefreshCount(context.Background(), telv2.Attributes{
UserAgent: i.userAgent,
RefreshType: telv2.RefreshAheadType,
RefreshStatus: telv2.RefreshSuccess,
})
})
return r
}