datanode/bootstrap_manager.go (123 lines of code) (raw):
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package datanode
import (
"github.com/uber/aresdb/cluster/topology"
"github.com/uber/aresdb/datanode/bootstrap"
"github.com/uber/aresdb/datanode/client"
"github.com/uber/aresdb/utils"
"sync"
"time"
xerrors "github.com/m3db/m3/src/x/errors"
)
// bootstrapManagerImpl is the implementation of the interface databaseBootstrapManager
type bootstrapManagerImpl struct {
sync.RWMutex
opts bootstrap.Options
origin string
peerSource client.PeerSource
bootstrapable bootstrap.Bootstrapable
state bootstrap.BootstrapState
hasPending bool
bootstrapTableShards map[string]bootstrap.BootstrapDetails
lastBootstrapCompletionTime time.Time
topo topology.Topology
}
// NewBootstrapManager creates bootstrap manager
func NewBootstrapManager(origin string,
bootstrappable bootstrap.Bootstrapable,
bootstrapOpts bootstrap.Options,
topo topology.Topology,
) BootstrapManager {
peerSource, err := NewPeerSource(topo, nil)
if err != nil {
utils.GetLogger().With("error", err.Error()).Fatal("failed to initialize peer source")
}
return &bootstrapManagerImpl{
origin: origin,
bootstrapable: bootstrappable,
opts: bootstrapOpts,
peerSource: peerSource,
topo: topo,
}
}
func (m *bootstrapManagerImpl) IsBootstrapped() bool {
m.RLock()
state := m.state
m.RUnlock()
return state == bootstrap.Bootstrapped
}
func (m *bootstrapManagerImpl) LastBootstrapCompletionTime() (time.Time, bool) {
return m.lastBootstrapCompletionTime, !m.lastBootstrapCompletionTime.IsZero()
}
func (m *bootstrapManagerImpl) Bootstrap() error {
m.Lock()
switch m.state {
case bootstrap.Bootstrapping:
// NB(r): Already bootstrapping, now a consequent bootstrap
// request comes in - we queue this up to bootstrap again
// once the current bootstrap has completed.
// This is an edge case that can occur if during either an
// initial bootstrap or a resharding bootstrap if a new
// reshard occurs and we need to bootstrap more shards.
m.hasPending = true
m.Unlock()
utils.GetLogger().Info("bootstrap enqueued, datanode is in bootstrapping state")
return nil
default:
m.state = bootstrap.Bootstrapping
}
m.Unlock()
// Keep performing bootstraps until none pending
multiErr := xerrors.NewMultiError()
for {
err := m.bootstrap()
if err != nil {
multiErr = multiErr.Add(err)
}
m.Lock()
currPending := m.hasPending
if currPending {
// New bootstrap calls should now enqueue another pending bootstrap
m.hasPending = false
} else {
m.state = bootstrap.Bootstrapped
}
m.Unlock()
if !currPending {
break
}
}
m.lastBootstrapCompletionTime = utils.Now()
return multiErr.FinalError()
}
func (m *bootstrapManagerImpl) bootstrap() error {
startDatanodeBootstrap := utils.Now()
topoStateSnapshot := newInitialTopologyState(m.topo)
err := m.bootstrapable.Bootstrap(m.peerSource, m.origin, m.topo, topoStateSnapshot, m.opts)
took := utils.Now().Sub(startDatanodeBootstrap)
if err != nil {
utils.GetLogger().With("datanode", m.origin).
With("duration", took).
With("error", err.Error()).
Info("bootstrap finished with err")
return err
}
utils.GetLogger().With("datanode", m.origin).
With("duration", took).
Info("bootstrap finished")
return nil
}
func newInitialTopologyState(topo topology.Topology) *topology.StateSnapshot {
topoMap := topo.Get()
var (
hostShardSets = topoMap.HostShardSets()
topologyState = &topology.StateSnapshot{
ShardStates: topology.ShardStates{},
}
)
for _, hostShardSet := range hostShardSets {
for _, currShard := range hostShardSet.ShardSet().All() {
shardID := topology.ShardID(currShard.ID())
existing, ok := topologyState.ShardStates[shardID]
if !ok {
existing = map[topology.HostID]topology.HostShardState{}
topologyState.ShardStates[shardID] = existing
}
hostID := topology.HostID(hostShardSet.Host().ID())
existing[hostID] = topology.HostShardState{
Host: hostShardSet.Host(),
ShardState: currShard.State(),
}
}
}
return topologyState
}