in dialer.go [295:453]
func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption) (conn net.Conn, err error) {
select {
case <-d.closed:
return nil, ErrDialerClosed
default:
}
inst, err := alloydb.ParseInstURI(instance)
if err != nil {
return nil, err
}
mr := d.metricRecorder(ctx, inst)
var (
startTime = time.Now()
endDial tel.EndSpanFunc
attrs = telv2.Attributes{
IAMAuthN: d.useIAMAuthN,
UserAgent: d.userAgent,
RefreshType: telv2.RefreshAheadType,
}
)
if d.lazyRefresh {
attrs.RefreshType = telv2.RefreshLazyType
}
ctx, endDial = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn.Dial",
tel.AddInstanceName(instance),
tel.AddDialerID(d.dialerID),
)
defer func() {
go tel.RecordDialError(context.Background(), instance, d.dialerID, err)
go mr.RecordDialCount(ctx, attrs)
endDial(err)
}()
cfg := d.defaultDialCfg
for _, opt := range opts {
opt(&cfg)
}
var endInfo tel.EndSpanFunc
ctx, endInfo = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.InstanceInfo")
cache, cacheHit, err := d.connectionInfoCache(ctx, inst, mr)
attrs.CacheHit = cacheHit
if err != nil {
attrs.DialStatus = telv2.DialCacheError
endInfo(err)
return nil, err
}
ci, err := cache.ConnectionInfo(ctx)
if err != nil {
attrs.DialStatus = telv2.DialCacheError
d.removeCached(ctx, inst, cache, err)
endInfo(err)
return nil, err
}
endInfo(err)
// If the client certificate has expired (as when the computer goes to
// sleep, and the refresh cycle cannot run), force a refresh immediately.
// The TLS handshake will not fail on an expired client certificate. It's
// not until the first read where the client cert error will be surfaced.
// So check that the certificate is valid before proceeding.
if invalidClientCert(ctx, inst, d.logger, ci.Expiration) {
d.logger.Debugf(ctx, "[%v] Refreshing certificate now", inst.String())
cache.ForceRefresh()
// Block on refreshed connection info
ci, err = cache.ConnectionInfo(ctx)
if err != nil {
d.removeCached(ctx, inst, cache, err)
attrs.DialStatus = telv2.DialCacheError
return nil, err
}
}
addr, ok := ci.IPAddrs[cfg.ipType]
if !ok {
d.removeCached(ctx, inst, cache, err)
err := errtype.NewConfigError(
fmt.Sprintf("instance does not have IP of type %q", cfg.ipType),
inst.String(),
)
attrs.DialStatus = telv2.DialUserError
return nil, err
}
var connectEnd tel.EndSpanFunc
ctx, connectEnd = tel.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.Connect")
defer func() { connectEnd(err) }()
hostPort := net.JoinHostPort(addr, serverProxyPort)
f := d.dialFunc
if cfg.dialFunc != nil {
f = cfg.dialFunc
}
d.logger.Debugf(ctx, "[%v] Dialing %v", inst.String(), hostPort)
conn, err = f(ctx, "tcp", hostPort)
if err != nil {
d.logger.Debugf(ctx, "[%v] Dialing %v failed: %v", inst.String(), hostPort, err)
// refresh the instance info in case it caused the connection failure
cache.ForceRefresh()
attrs.DialStatus = telv2.DialTCPError
return nil, errtype.NewDialError("failed to dial", inst.String(), err)
}
if c, ok := conn.(*net.TCPConn); ok {
if err := c.SetKeepAlive(true); err != nil {
attrs.DialStatus = telv2.DialTCPError
return nil, errtype.NewDialError("failed to set keep-alive", inst.String(), err)
}
if err := c.SetKeepAlivePeriod(cfg.tcpKeepAlive); err != nil {
attrs.DialStatus = telv2.DialTCPError
return nil, errtype.NewDialError("failed to set keep-alive period", inst.String(), err)
}
}
c := &tls.Config{
Certificates: []tls.Certificate{ci.ClientCert},
RootCAs: ci.RootCAs,
// The PSC, private, and public IP all appear in the certificate as
// SAN. Use the server name that corresponds to the requested
// connection path.
ServerName: addr,
MinVersion: tls.VersionTLS13,
}
tlsConn := tls.Client(conn, c)
if err := tlsConn.HandshakeContext(ctx); err != nil {
d.logger.Debugf(ctx, "[%v] TLS handshake failed: %v", inst.String(), err)
// refresh the instance info in case it caused the handshake failure
cache.ForceRefresh()
_ = tlsConn.Close() // best effort close attempt
attrs.DialStatus = telv2.DialTLSError
return nil, errtype.NewDialError("handshake failed", inst.String(), err)
}
if !d.disableMetadataExchange {
// The metadata exchange must occur after the TLS connection is established
// to avoid leaking sensitive information.
err = d.metadataExchange(tlsConn)
if err != nil {
_ = tlsConn.Close() // best effort close attempt
attrs.DialStatus = telv2.DialMDXError
return nil, err
}
}
attrs.DialStatus = telv2.DialSuccess
latency := time.Since(startTime).Milliseconds()
go func() {
n := atomic.AddUint64(cache.openConns, 1)
tel.RecordOpenConnections(ctx, int64(n), d.dialerID, inst.String())
tel.RecordDialLatency(ctx, instance, d.dialerID, latency)
mr.RecordOpenConnection(ctx, attrs)
mr.RecordDialLatency(ctx, latency, attrs)
}()
return newInstrumentedConn(tlsConn, mr, attrs, func() {
n := atomic.AddUint64(cache.openConns, ^uint64(0))
tel.RecordOpenConnections(context.Background(), int64(n), d.dialerID, inst.String())
mr.RecordClosedConnection(context.Background(), attrs)
}, d.dialerID, inst.String()), nil
}