in internal/alloydb/instance.go [288:390]
func (i *RefreshAheadCache) scheduleRefresh(d time.Duration) *refreshOperation {
r := &refreshOperation{}
r.ready = make(chan struct{})
r.timer = time.AfterFunc(d, func() {
// instance has been closed, don't schedule anything
if err := i.ctx.Err(); err != nil {
i.logger.Debugf(
context.Background(),
"[%v] Instance is closed, stopping refresh operations",
i.instanceURI.String(),
)
r.err = err
close(r.ready)
return
}
i.logger.Debugf(
context.Background(),
"[%v] Connection info refresh operation started",
i.instanceURI.String(),
)
ctx, cancel := context.WithTimeout(i.ctx, i.refreshTimeout)
defer cancel()
err := i.l.Wait(ctx)
if err != nil {
r.err = errtype.NewDialError(
"context was canceled or expired before refresh completed",
i.instanceURI.String(),
nil,
)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation failed, err = %v",
i.instanceURI.String(),
r.err,
)
} else {
r.result, r.err = i.r.connectionInfo(i.ctx, i.instanceURI)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation complete",
i.instanceURI.String(),
)
i.logger.Debugf(
ctx,
"[%v] Current certificate expiration = %v",
i.instanceURI.String(),
r.result.Expiration.UTC().Format(time.RFC3339),
)
}
close(r.ready)
// Once the refresh is complete, update "current" with working
// result and schedule a new refresh
i.resultGuard.Lock()
defer i.resultGuard.Unlock()
// if failed, scheduled the next refresh immediately
if r.err != nil {
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation scheduled immediately",
i.instanceURI.String(),
)
i.next = i.scheduleRefresh(0)
// If the latest result is bad, avoid replacing the
// used result while it's still valid and potentially
// able to provide successful connections. TODO: This
// means that errors while the current result is still
// valid are suppressed. We should try to surface
// errors in a more meaningful way.
if !i.cur.isValid() {
i.cur = r
}
go i.metricRecorder.RecordRefreshCount(context.Background(), telv2.Attributes{
UserAgent: i.userAgent,
RefreshType: telv2.RefreshAheadType,
RefreshStatus: telv2.RefreshFailure,
})
return
}
// Update the current results, and schedule the next refresh in
// the future
i.cur = r
t := refreshDuration(time.Now(), i.cur.result.Expiration)
i.logger.Debugf(
ctx,
"[%v] Connection info refresh operation scheduled at %v (now + %v)",
i.instanceURI.String(),
time.Now().Add(t).UTC().Format(time.RFC3339),
t.Round(time.Minute),
)
i.next = i.scheduleRefresh(t)
go i.metricRecorder.RecordRefreshCount(context.Background(), telv2.Attributes{
UserAgent: i.userAgent,
RefreshType: telv2.RefreshAheadType,
RefreshStatus: telv2.RefreshSuccess,
})
})
return r
}