go/connection-refresh/btrefresh/bigtable_rotator.go (64 lines of code) (raw):

package btrefresh import ( "context" "sync" "time" "cloud.google.com/go/bigtable" ) // Time to wait before killing the previous connection // during rotation. This may kill ongoing requests if they // take longer than this amount of time. const lameduckTime = 1 * time.Minute // Number of requests to issue on a new client // to warm up each underlying client in the pool. // The default pool size is 4, so we run 2n to have // good chances of hitting them all. const nWarmConnections = 2 * 4 // RotatingTable is a bigtable.Table that automatically // reconnects to Cloud Bigtable at a given interval. type RotatingTable struct { *bigtable.Table client *bigtable.Client swap *time.Ticker errors chan error } // BackgroundErrors is a channel that will propagate errors // that come from refreshing the connection. It can either be // checked with select before performing ops or watched // in a background thread. func (r RotatingTable) BackgroundErrors() <-chan error { return r.errors } // Close will close the connections and stop rotating. func (r RotatingTable) Close() { r.swap.Stop() close(r.errors) r.client.Close() } // BtDialer encapsulates making a connection to your bigtable. // This way you can use any means for dialing/ getting credentials. type BtDialer func() (*bigtable.Client, error) // NewRotatingTable makes a new Table reference that will refresh itself in the background at the given // interval. func NewRotatingTable(dialer BtDialer, table string, refresh time.Duration) (*RotatingTable, error) { client, err := dialer() errors := make(chan error, 1) if err != nil { return nil, err } tbl := client.Open(table) warmTable(tbl) ticker := time.NewTicker(refresh) rt := &RotatingTable{tbl, client, ticker, errors} go func() { for range ticker.C { // Close the old client after waiting a bit go func() { oldC := rt.client time.Sleep(lameduckTime) oldC.Close() }() client, err := dialer() if err != nil { errors <- err continue } tbl := client.Open(table) warmTable(tbl) rt.Table = tbl } }() return rt, nil } // A new table reference will need to have some requests // go across it to establish each connection in the connection // pool. func warmTable(tbl *bigtable.Table) { wg := sync.WaitGroup{} // Run the warming requests across threads to saturate the // connection pool. for i := 0; i < nWarmConnections; i++ { wg.Add(1) go func() { // Send a request that does not actually return data. tbl.ReadRow(context.Background(), "NOT_A_REAL_ROW") wg.Done() }() } wg.Wait() }