in memstore/bootstrap.go [107:226]
func (shard *TableShard) Bootstrap(
peerSource client.PeerSource,
origin string,
topo topology.Topology,
topoState *topology.StateSnapshot,
options bootstrap.Options,
) error {
shard.bootstrapLock.Lock()
// check whether shard is already bootstrapping
if shard.BootstrapState == bootstrap.Bootstrapping {
shard.bootstrapLock.Unlock()
return bootstrap.ErrTableShardIsBootstrapping
}
shard.BootstrapState = bootstrap.Bootstrapping
shard.bootstrapLock.Unlock()
shard.Schema.RLock()
numColumns := len(shard.Schema.GetValueTypeByColumn())
schema := shard.Schema.Schema
shard.Schema.RUnlock()
shard.BootstrapDetails.Clear()
shard.BootstrapDetails.SetNumColumns(numColumns)
success := false
defer func() {
shard.bootstrapLock.Lock()
if success {
shard.BootstrapState = bootstrap.Bootstrapped
} else {
shard.BootstrapState = bootstrap.BootstrapNotStarted
}
shard.bootstrapLock.Unlock()
}()
if atomic.LoadUint32(&shard.needPeerCopy) == 1 {
shard.BootstrapDetails.SetBootstrapStage(bootstrap.PeerCopy)
// find peer node for copy metadata and raw data
peerNodes, err := shard.findBootstrapSource(origin, topo, topoState)
if err != nil {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("origin", origin).
With("error", err.Error()).
Error("failed to find peers for shard bootstrap")
return err
}
// Note: skip peer copy step when we found no peers. this is correct based on the assumption that
// if a shard replica does not find any peer in available/leaving state
// then the cluster should be in the initial phase of new placement created
// when we do replace/remove/add, the total number of shard replica remains the same
// so the number of initializing replica should match the number of leaving replica
// when we increase the number of replicas, we should always find existing available replica
if len(peerNodes) == 0 {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("origin", origin).
Info("no available/leaving source, new placement")
} else {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("peers", peerNodes).
Info("found peer to bootstrap from")
// shuffle peer nodes randomly
rand.New(rand.NewSource(utils.Now().Unix())).Shuffle(len(peerNodes), func(i, j int) {
peerNodes[i], peerNodes[j] = peerNodes[j], peerNodes[i]
})
var dataStreamErr error
borrowErr := peerSource.BorrowConnection(peerNodes, func(peerID string, nodeClient rpc.PeerDataNodeClient) {
shard.BootstrapDetails.SetSource(peerID)
dataStreamErr = shard.fetchDataFromPeer(peerID, nodeClient, origin, options)
})
if borrowErr != nil {
return borrowErr
}
if dataStreamErr != nil {
return dataStreamErr
}
}
atomic.StoreUint32(&shard.needPeerCopy, 0)
}
// load metadata from disk
err := shard.LoadMetaData()
if err != nil {
return err
}
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Preload)
// preload snapshot or archive batches into memory
if schema.IsFactTable {
// preload all columns for fact table
endDay := int(utils.Now().Unix() / 86400)
for columnID, column := range schema.Columns {
if column.Deleted {
continue
}
shard.PreloadColumn(columnID, endDay-column.Config.PreloadingDays, endDay)
}
} else {
// preload snapshot for dimension table
err = shard.LoadSnapshot()
if err != nil {
return err
}
}
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Recovery)
// start play redolog
shard.PlayRedoLog()
success = true
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Finished)
return nil
}