memstore/bootstrap.go (528 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"context"
"io"
"math"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
m3Shard "github.com/m3db/m3/src/cluster/shard"
xerrors "github.com/m3db/m3/src/x/errors"
xretry "github.com/m3db/m3/src/x/retry"
xsync "github.com/m3db/m3/src/x/sync"
"github.com/uber/aresdb/cluster/topology"
"github.com/uber/aresdb/datanode/bootstrap"
"github.com/uber/aresdb/datanode/client"
"github.com/uber/aresdb/datanode/generated/proto/rpc"
"github.com/uber/aresdb/utils"
)
// IsBootstrapped returns whether this table shard is bootstrapped.
func (shard *TableShard) IsBootstrapped() bool {
shard.bootstrapLock.Lock()
defer shard.bootstrapLock.Unlock()
return shard.BootstrapState == bootstrap.Bootstrapped
}
// IsDiskDataAvailable returns whether the data is available on disk for table shard
func (shard *TableShard) IsDiskDataAvailable() bool {
return atomic.LoadUint32(&shard.needPeerCopy) != 1
}
func (m *memStoreImpl) Bootstrap(
peerSource client.PeerSource,
origin string,
topo topology.Topology,
topoState *topology.StateSnapshot,
options bootstrap.Options,
) error {
// snapshot table shards not bootstrapped
m.RLock()
tableShards := make([]*TableShard, 0)
for _, shardMap := range m.TableShards {
for _, shard := range shardMap {
if !shard.IsBootstrapped() {
shard.Users.Add(1)
tableShards = append(tableShards, shard)
}
}
}
m.RUnlock()
// partition table shards based on whether it needs to copy data from peer
// so that we can start process those doesn't first
nonInitializingEnd := 0
for i := 0; i < len(tableShards); i++ {
// if doesn't need peer copy, swap into the first half
if atomic.LoadUint32(&tableShards[i].needPeerCopy) == 0 {
tableShards[i], tableShards[nonInitializingEnd] = tableShards[nonInitializingEnd], tableShards[i]
nonInitializingEnd++
}
}
workers := xsync.NewWorkerPool(options.MaxConcurrentTableShards())
workers.Init()
var (
multiErr = xerrors.NewMultiError()
mutex sync.Mutex
wg sync.WaitGroup
)
for _, shard := range tableShards {
shard := shard
wg.Add(1)
workers.Go(func() {
err := shard.Bootstrap(peerSource, origin, topo, topoState, options)
if err != nil {
mutex.Lock()
multiErr = multiErr.Add(err)
mutex.Unlock()
}
wg.Done()
shard.Users.Done()
})
}
wg.Wait()
return multiErr.FinalError()
}
// Bootstrap executes bootstrap for table shard
func (shard *TableShard) Bootstrap(
peerSource client.PeerSource,
origin string,
topo topology.Topology,
topoState *topology.StateSnapshot,
options bootstrap.Options,
) error {
shard.bootstrapLock.Lock()
// check whether shard is already bootstrapping
if shard.BootstrapState == bootstrap.Bootstrapping {
shard.bootstrapLock.Unlock()
return bootstrap.ErrTableShardIsBootstrapping
}
shard.BootstrapState = bootstrap.Bootstrapping
shard.bootstrapLock.Unlock()
shard.Schema.RLock()
numColumns := len(shard.Schema.GetValueTypeByColumn())
schema := shard.Schema.Schema
shard.Schema.RUnlock()
shard.BootstrapDetails.Clear()
shard.BootstrapDetails.SetNumColumns(numColumns)
success := false
defer func() {
shard.bootstrapLock.Lock()
if success {
shard.BootstrapState = bootstrap.Bootstrapped
} else {
shard.BootstrapState = bootstrap.BootstrapNotStarted
}
shard.bootstrapLock.Unlock()
}()
if atomic.LoadUint32(&shard.needPeerCopy) == 1 {
shard.BootstrapDetails.SetBootstrapStage(bootstrap.PeerCopy)
// find peer node for copy metadata and raw data
peerNodes, err := shard.findBootstrapSource(origin, topo, topoState)
if err != nil {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("origin", origin).
With("error", err.Error()).
Error("failed to find peers for shard bootstrap")
return err
}
// Note: skip peer copy step when we found no peers. this is correct based on the assumption that
// if a shard replica does not find any peer in available/leaving state
// then the cluster should be in the initial phase of new placement created
// when we do replace/remove/add, the total number of shard replica remains the same
// so the number of initializing replica should match the number of leaving replica
// when we increase the number of replicas, we should always find existing available replica
if len(peerNodes) == 0 {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("origin", origin).
Info("no available/leaving source, new placement")
} else {
utils.GetLogger().
With("table", shard.Schema.Schema.Name).
With("shardID", shard.ShardID).
With("peers", peerNodes).
Info("found peer to bootstrap from")
// shuffle peer nodes randomly
rand.New(rand.NewSource(utils.Now().Unix())).Shuffle(len(peerNodes), func(i, j int) {
peerNodes[i], peerNodes[j] = peerNodes[j], peerNodes[i]
})
var dataStreamErr error
borrowErr := peerSource.BorrowConnection(peerNodes, func(peerID string, nodeClient rpc.PeerDataNodeClient) {
shard.BootstrapDetails.SetSource(peerID)
dataStreamErr = shard.fetchDataFromPeer(peerID, nodeClient, origin, options)
})
if borrowErr != nil {
return borrowErr
}
if dataStreamErr != nil {
return dataStreamErr
}
}
atomic.StoreUint32(&shard.needPeerCopy, 0)
}
// load metadata from disk
err := shard.LoadMetaData()
if err != nil {
return err
}
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Preload)
// preload snapshot or archive batches into memory
if schema.IsFactTable {
// preload all columns for fact table
endDay := int(utils.Now().Unix() / 86400)
for columnID, column := range schema.Columns {
if column.Deleted {
continue
}
shard.PreloadColumn(columnID, endDay-column.Config.PreloadingDays, endDay)
}
} else {
// preload snapshot for dimension table
err = shard.LoadSnapshot()
if err != nil {
return err
}
}
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Recovery)
// start play redolog
shard.PlayRedoLog()
success = true
shard.BootstrapDetails.SetBootstrapStage(bootstrap.Finished)
return nil
}
type vpRawDataRequest struct {
tableShardMeta *rpc.TableShardMetaData
batchMeta *rpc.BatchMetaData
vpMeta *rpc.VectorPartyMetaData
}
// fetchDataFromPeer fetch metadata and raw vector party data from peer
func (shard *TableShard) fetchDataFromPeer(
peerID string,
client rpc.PeerDataNodeClient,
origin string,
options bootstrap.Options,
) error {
sessionID, doneFn, err := shard.startStreamSession(peerID, client, origin, options)
if err != nil {
return err
}
defer doneFn()
// 1. fetch meta data
tableShardMeta, err := shard.fetchBatchMetaDataFromPeer(origin, sessionID, client)
if err != nil {
return err
}
// 2. set metadata and trigger recovery
if err := shard.setTableShardMetadata(tableShardMeta); err != nil {
return err
}
// 3. fetch raw vps
workerPool := xsync.NewWorkerPool(options.MaxConcurrentStreamsPerTableShards())
workerPool.Init()
retrier := xretry.NewRetrier(xretry.NewOptions().SetMaxRetries(3))
var (
mutex sync.Mutex
errors xerrors.MultiError
wg sync.WaitGroup
)
for _, batchMeta := range tableShardMeta.Batches {
err := shard.setBatchMetadata(tableShardMeta, batchMeta)
if err != nil {
return utils.StackError(err, "failed to set batch level metadata")
}
for _, vpMeta := range batchMeta.Vps {
shard.BootstrapDetails.AddVPToCopy(batchMeta.GetBatchID(), vpMeta.GetColumnID())
}
}
fetchStart := utils.Now()
for _, batchMeta := range tableShardMeta.Batches {
for _, vpMeta := range batchMeta.Vps {
// capture batchMeta and vpMeta
batchMeta := batchMeta
vpMeta := vpMeta
wg.Add(1)
// TODO: add checksum to vp file and vpMeta to avoid copying existing data on disk
workerPool.Go(func() {
defer wg.Done()
attempts := 0
err = retrier.Attempt(func() error {
attempts++
request, vpWriter, err := shard.createVectorPartyRawDataRequest(origin, sessionID, tableShardMeta, batchMeta, vpMeta)
if err != nil {
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
Errorf("failed to create vector party raw data request, attempt %d", attempts)
return err
}
defer vpWriter.Close()
fetchStart := utils.Now()
bytesFetched, err := shard.fetchVectorPartyRawDataFromPeer(client, vpWriter, request)
if err != nil {
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
Errorf("failed to fetch data from peer, attempt %d", attempts)
return err
}
duration := utils.Now().Sub(fetchStart)
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request).
Infof("successfully fetched data (%d bytes) from peer, took %f seconds, attempt %d", bytesFetched, duration.Seconds(), attempts)
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
GetChildTimer(map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPFetchTime).Record(duration)
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetChildCounter(
map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPBytesFetched).Inc(int64(bytesFetched))
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
GetChildGauge(map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPFetchBytesPerSec).Update(float64(bytesFetched) / duration.Seconds())
shard.BootstrapDetails.MarkVPFinished(batchMeta.GetBatchID(), vpMeta.GetColumnID())
return nil
})
if err != nil {
mutex.Lock()
errors = errors.Add(err)
mutex.Unlock()
utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchFailure).Inc(1)
} else {
utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchSuccess).Inc(1)
}
})
}
}
wg.Wait()
if !errors.Empty() {
return errors.FinalError()
}
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetTimer(utils.TotalRawVPFetchTime).Record(utils.Now().Sub(fetchStart))
return nil
}
func (shard *TableShard) fetchBatchMetaDataFromPeer(origin string, sessionID int64, client rpc.PeerDataNodeClient) (*rpc.TableShardMetaData, error) {
var (
endBatchID int32 = math.MaxInt32
startBatchID int32 = math.MinInt32
)
shard.Schema.RLock()
if shard.Schema.Schema.IsFactTable {
endBatchID = int32(utils.Now().Unix() / 86400)
}
if shard.Schema.Schema.IsFactTable && shard.Schema.Schema.Config.RecordRetentionInDays > 0 {
startBatchID = endBatchID - int32(shard.Schema.Schema.Config.RecordRetentionInDays) + 1
}
shard.Schema.RUnlock()
req := &rpc.TableShardMetaDataRequest{
Table: shard.Schema.Schema.Name,
Incarnation: int32(shard.Schema.Schema.Incarnation),
Shard: uint32(shard.ShardID),
StartBatchID: startBatchID,
SessionID: sessionID,
NodeID: origin,
EndBatchID: endBatchID,
}
return client.FetchTableShardMetaData(context.Background(), req)
}
func (shard *TableShard) createVectorPartyRawDataRequest(
origin string,
sessionID int64,
tableMeta *rpc.TableShardMetaData,
batchMeta *rpc.BatchMetaData,
vpMeta *rpc.VectorPartyMetaData,
) (rawVPDataRequest *rpc.VectorPartyRawDataRequest, vpWriter utils.WriteSyncCloser, err error) {
if shard.Schema.Schema.IsFactTable {
// fact table archive vp writer
vpWriter, err = shard.diskStore.OpenVectorPartyFileForWrite(tableMeta.GetTable(),
int(vpMeta.GetColumnID()),
int(tableMeta.GetShard()), int(batchMeta.GetBatchID()),
batchMeta.GetArchiveVersion().GetArchiveVersion(),
batchMeta.GetArchiveVersion().GetBackfillSeq())
rawVPDataRequest = &rpc.VectorPartyRawDataRequest{
SessionID: sessionID,
NodeID: origin,
Table: tableMeta.GetTable(),
Shard: tableMeta.GetShard(),
Incarnation: tableMeta.GetIncarnation(),
BatchID: int32(batchMeta.GetBatchID()),
Version: &rpc.VectorPartyRawDataRequest_ArchiveVersion{
ArchiveVersion: &rpc.ArchiveVersion{
ArchiveVersion: batchMeta.GetArchiveVersion().GetArchiveVersion(),
BackfillSeq: batchMeta.GetArchiveVersion().GetBackfillSeq(),
},
},
ColumnID: vpMeta.GetColumnID(),
}
} else {
// dimension table snapshot vp writer
vpWriter, err = shard.diskStore.OpenSnapshotVectorPartyFileForWrite(
tableMeta.GetTable(),
int(tableMeta.GetShard()),
tableMeta.GetDimensionMeta().GetSnapshotVersion().GetRedoFileID(),
tableMeta.GetDimensionMeta().GetSnapshotVersion().GetRedoFileOffset(),
int(batchMeta.GetBatchID()),
int(vpMeta.GetColumnID()))
rawVPDataRequest = &rpc.VectorPartyRawDataRequest{
SessionID: sessionID,
NodeID: origin,
Table: tableMeta.GetTable(),
Shard: tableMeta.GetShard(),
Incarnation: tableMeta.GetIncarnation(),
BatchID: int32(batchMeta.GetBatchID()),
Version: &rpc.VectorPartyRawDataRequest_SnapshotVersion{
SnapshotVersion: tableMeta.GetDimensionMeta().GetSnapshotVersion(),
},
ColumnID: vpMeta.GetColumnID(),
}
}
return
}
func (shard *TableShard) fetchVectorPartyRawDataFromPeer(
client rpc.PeerDataNodeClient,
vpWriter utils.WriteSyncCloser,
request *rpc.VectorPartyRawDataRequest,
) (int, error) {
stream, err := client.FetchVectorPartyRawData(context.Background(), request)
if err != nil {
return 0, err
}
totalBytes := 0
for {
data, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return totalBytes, err
}
bytesWritten, err := vpWriter.Write(data.Chunk)
if err != nil {
return totalBytes, err
}
totalBytes += bytesWritten
}
if err = vpWriter.Sync(); err != nil {
return totalBytes, utils.StackError(err, "failed to sync to disk")
}
return totalBytes, nil
}
func (shard *TableShard) setBatchMetadata(tableShardMeta *rpc.TableShardMetaData, batchMeta *rpc.BatchMetaData) error {
if shard.Schema.Schema.IsFactTable {
err := shard.metaStore.OverwriteArchiveBatchVersion(shard.Schema.Schema.Name, shard.ShardID,
int(batchMeta.BatchID),
batchMeta.GetArchiveVersion().GetArchiveVersion(),
batchMeta.GetArchiveVersion().GetBackfillSeq(),
int(batchMeta.GetSize()))
if err != nil {
return err
}
}
return nil
}
func (shard *TableShard) setTableShardMetadata(tableShardMeta *rpc.TableShardMetaData) error {
// update kafka offsets
err := shard.metaStore.UpdateRedoLogCommitOffset(shard.Schema.Schema.Name, shard.ShardID, tableShardMeta.GetKafkaOffset().GetCommitOffset())
if err != nil {
return utils.StackError(err, "failed to update kafka commit offset")
}
err = shard.metaStore.UpdateRedoLogCheckpointOffset(shard.Schema.Schema.Name, shard.ShardID, tableShardMeta.GetKafkaOffset().GetCheckPointOffset())
if err != nil {
return utils.StackError(err, "failed to update archiving cutoff")
}
if shard.Schema.Schema.IsFactTable {
// update archiving low water mark cutoff and backfill progress for fact table
err := shard.metaStore.UpdateArchivingCutoff(shard.Schema.Schema.Name, shard.ShardID, tableShardMeta.GetFactMeta().GetHighWatermark())
if err != nil {
return utils.StackError(err, "failed to update archiving cutoff")
}
err = shard.metaStore.UpdateBackfillProgress(shard.Schema.Schema.Name, shard.ShardID, tableShardMeta.GetFactMeta().GetBackfillCheckpoint().GetRedoFileID(), tableShardMeta.GetFactMeta().GetBackfillCheckpoint().GetRedoFileOffset())
if err != nil {
return utils.StackError(err, "failed to update backfill progress")
}
} else {
// update snapshot pregress for dimension table
err := shard.metaStore.UpdateSnapshotProgress(
shard.Schema.Schema.Name,
shard.ShardID, tableShardMeta.GetDimensionMeta().GetSnapshotVersion().GetRedoFileID(),
tableShardMeta.GetDimensionMeta().GetSnapshotVersion().GetRedoFileOffset(),
tableShardMeta.GetDimensionMeta().GetLastBatchID(),
uint32(tableShardMeta.GetDimensionMeta().GetLastBatchSize()))
if err != nil {
return utils.StackError(err, "failed to update archiving cutoff")
}
}
return nil
}
func (shard *TableShard) startStreamSession(peerID string, client rpc.PeerDataNodeClient, origin string, options bootstrap.Options) (sessionID int64, doneFn func(), err error) {
done := make(chan struct{})
ttl := int64(options.BootstrapSessionTTL())
startSessionRequest := &rpc.StartSessionRequest{
Table: shard.Schema.Schema.Name,
Shard: uint32(shard.ShardID),
NodeID: origin,
Ttl: ttl,
}
session, err := client.StartSession(context.Background(), startSessionRequest)
if err != nil {
return 0, nil, utils.StackError(err, "failed to start session")
}
sessionID = session.ID
stream, err := client.KeepAlive(context.Background())
if err != nil {
return 0, nil, utils.StackError(err, "failed to create keep alive stream")
}
// send first keep alive request
if err = xretry.NewRetrier(xretry.NewOptions()).Attempt(func() error {
return stream.Send(&rpc.Session{ID: sessionID, NodeID: origin})
}); err != nil {
return 0, nil, utils.StackError(err, "failed to send keep alive session")
}
// send loop
go func(stream rpc.PeerDataNode_KeepAliveClient) {
for {
ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&ttl) / 2))
select {
case <-ticker.C:
err = xretry.NewRetrier(xretry.NewOptions()).Attempt(func() error {
return stream.Send(&rpc.Session{ID: sessionID, NodeID: origin})
})
if err != nil {
utils.GetLogger().
With(
"table", shard.Schema.Schema.Name,
"shard", shard.ShardID,
"error", err.Error(),
"peer", peerID).
Error("failed to send keep alive session")
}
case <-done:
err = stream.CloseSend()
if err != nil {
utils.GetLogger().
With(
"table",
shard.Schema.Schema.Name,
"shard", shard.ShardID,
"error", err.Error(),
"peer", peerID).
Error("failed to close keep alive session")
}
return
}
}
}(stream)
// receive loop
go func(stream rpc.PeerDataNode_KeepAliveClient) {
for {
resp, err := stream.Recv()
if err == io.EOF {
// server closed the stream
utils.GetLogger().With("table", shard.Schema.Schema.Name, "shard", shard.ShardID).Error("server closed keep alive session")
return
} else if err != nil {
utils.GetLogger().With("table", shard.Schema.Schema.Name, "shard", shard.ShardID, "error", err.Error()).Error("received error from keep alive session")
return
}
if resp.Ttl > 0 {
atomic.StoreInt64(&ttl, resp.Ttl)
}
}
}(stream)
return sessionID, func() {
close(done)
}, nil
}
func (shard *TableShard) findBootstrapSource(
origin string, topo topology.Topology, topoState *topology.StateSnapshot) ([]string, error) {
hostShardStates, ok := topoState.ShardStates[topology.ShardID(shard.ShardID)]
if !ok {
// This shard was not part of the topology when the bootstrapping
// process began.
return nil, utils.StackError(nil, "shard does not exist in topology")
}
peers := make([]string, 0, topo.Get().HostsLen())
for _, hostShardState := range hostShardStates {
if hostShardState.Host.ID() == origin {
// Don't take self into account
continue
}
shardState := hostShardState.ShardState
switch shardState {
// Don't want to peer bootstrap from a node that has not yet completely
// taken ownership of the shard.
case m3Shard.Initializing:
// Success cases - We can bootstrap from this host, which is enough to
// mark this shard as bootstrappable.
case m3Shard.Leaving:
fallthrough
case m3Shard.Available:
peers = append(peers, hostShardState.Host.ID())
case m3Shard.Unknown:
fallthrough
default:
}
}
return peers, nil
}