datanode/bootstrap/bootstrap_server.go (523 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bootstrap
import (
"bufio"
"context"
"errors"
pb "github.com/uber/aresdb/datanode/generated/proto/rpc"
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
"io"
"os"
"sync"
"sync/atomic"
"time"
)
const (
chunkSize = 32 * 1024
bufferSize = 32 * 1024
recycleInterval = 5 * time.Second
)
var (
errNoCallerID = errors.New("caller node id not set in request")
errNoSessionID = errors.New("session id not set in request")
errInvalidSessionID = errors.New("invalid session id")
errInvalidRequset = errors.New("invalid request, table/shard not match")
errSessionExisting = errors.New("The request table/shard already have session running from the same node")
)
type PeerDataNodeServerImpl struct {
sync.RWMutex
// session id generator
sequenceID int64
metaStore common.MetaStore
diskStore diskstore.DiskStore
// session id to sessionInfo map
sessions map[int64]*sessionInfo
// tracking of all sessions for each table/shard
tableShardSessions map[tableShardPair][]int64
}
type tableShardPair struct {
table string
shardID uint32
}
type sessionInfo struct {
sessionID int64
table string
shardID uint32
nodeID string
addr string
lastLiveTime time.Time
ttl int64
}
func NewPeerDataNodeServer(metaStore common.MetaStore, diskStore diskstore.DiskStore) pb.PeerDataNodeServer {
return &PeerDataNodeServerImpl{
metaStore: metaStore,
diskStore: diskStore,
sessions: make(map[int64]*sessionInfo),
tableShardSessions: make(map[tableShardPair][]int64),
}
}
// AcquireToken is to check if any bootstrap is running in the table/shard
// if no bootstrap session is running on the table/shard, it will increase the token count, and return true
// the caller need to release the usage by calling ReleaseToken
func (p *PeerDataNodeServerImpl) AcquireToken(tableName string, shardID uint32) bool {
p.Lock()
defer p.Unlock()
// lazy clean the obsolete orphan sessions
now := utils.Now()
for sid, session := range p.sessions {
if now.After(session.lastLiveTime.Add(time.Duration(session.ttl))) {
p.cleanSession(sid, false)
}
}
key := tableShardPair{table: tableName, shardID: shardID}
sessionIDs, ok := p.tableShardSessions[key]
if !ok || len(sessionIDs) == 0 {
return true
}
return false
}
// AcquireToken release the token count, must call this when call AcquireToken success
func (p *PeerDataNodeServerImpl) ReleaseToken(tableName string, shardID uint32) {
// nothing to do for now
}
// getNextSequence create new session id
func (p *PeerDataNodeServerImpl) getNextSequence() int64 {
return atomic.AddInt64(&p.sequenceID, 1)
}
// Health return the healthiness status of the data server
func (p *PeerDataNodeServerImpl) Health(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{
Status: pb.HealthCheckResponse_SERVING,
}, nil
}
// StartSession create new session for one table/shard/node, only One session can be established on one table/shard from one node
func (p *PeerDataNodeServerImpl) StartSession(ctx context.Context, req *pb.StartSessionRequest) (*pb.Session, error) {
var err error
sessionInfo := &sessionInfo{
table: req.Table,
shardID: req.Shard,
nodeID: req.NodeID,
ttl: req.Ttl,
lastLiveTime: utils.Now(),
sessionID: p.getNextSequence(),
}
defer func() {
if err == nil {
logInfoMsg(sessionInfo, "started bootstrap session successfully")
} else {
logErrorMsg(sessionInfo, err, "start bootstrap session failed")
}
}()
if len(req.NodeID) == 0 {
err = errNoCallerID
return nil, err
}
if err = p.validateTable(req.Table, req.Shard); err != nil {
return nil, err
}
if err = p.checkReqExist(sessionInfo); err != nil {
return nil, err
}
p.addSession(sessionInfo)
return &pb.Session{
ID: sessionInfo.sessionID,
}, nil
}
func (p *PeerDataNodeServerImpl) checkReqExist(s *sessionInfo) error {
p.RLock()
defer p.RUnlock()
pair := tableShardPair{
table: s.table,
shardID: s.shardID,
}
sessions, ok := p.tableShardSessions[pair]
if !ok {
return nil
}
for _, sid := range sessions {
if p.sessions[sid].nodeID == s.nodeID {
return errSessionExisting
}
}
return nil
}
// KeepAlive is like client/server ping process, to notify health about each other
func (p *PeerDataNodeServerImpl) KeepAlive(stream pb.PeerDataNode_KeepAliveServer) error {
utils.GetLogger().With("action", "bootstrap").Info("keep alive called")
var sessionInfo *sessionInfo
var err error
defer func() {
if sessionInfo != nil {
if err == nil {
logInfoMsg(sessionInfo, "keep alive stoped")
} else {
logErrorMsg(sessionInfo, err, "keep alive failed")
}
} else {
utils.GetLogger().With("action", "bootstrap", "error", err).Error("keep alive stopped")
}
}()
for {
var session *pb.Session
session, err = stream.Recv()
if err == io.EOF {
err = nil
break
}
if err != nil {
return err
}
if sessionInfo == nil {
if err = p.validateSessionSource(session.ID, session.NodeID); err != nil {
return err
}
sessionInfo, _ = p.getSession(session.ID)
}
// update last live time
sessionInfo.lastLiveTime = utils.Now()
if err = stream.Send(&pb.KeepAliveResponse{ID: session.ID, Ttl: sessionInfo.ttl}); err != nil {
return err
}
}
if sessionInfo != nil {
p.cleanSession(sessionInfo.sessionID, true)
}
return nil
}
// FetchTableShardMetaData to retrieve all metadata for one table/shard
func (p *PeerDataNodeServerImpl) FetchTableShardMetaData(ctx context.Context, req *pb.TableShardMetaDataRequest) (*pb.TableShardMetaData, error) {
sessionInfo := &sessionInfo{
table: req.Table,
shardID: req.Shard,
nodeID: req.NodeID,
}
var err error
logInfoMsg(sessionInfo, "FetchTableShardMetaData called")
defer func() {
if err == nil {
logInfoMsg(sessionInfo, "FetchTableShardMetaData succeed")
} else {
logErrorMsg(sessionInfo, err, "FetchTableShardMetaData failed")
}
}()
if err = p.validateRequest(req.SessionID, req.NodeID, req.Table, req.Shard); err != nil {
return nil, err
}
t, err := p.metaStore.GetTable(req.Table)
if err != nil {
return nil, err
}
commitOffset, err := p.metaStore.GetRedoLogCommitOffset(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
checkpointOffset, err := p.metaStore.GetRedoLogCheckpointOffset(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
m := &pb.TableShardMetaData{
Table: req.Table,
Shard: req.Shard,
Incarnation: int32(t.Incarnation),
KafkaOffset: &pb.KafkaOffset{
CommitOffset: commitOffset,
CheckPointOffset: checkpointOffset,
},
}
if !t.IsFactTable {
// dimension table
redoFileID, redoFileOffset, lastBatchID, lastBatchSize, err := p.metaStore.GetSnapshotProgress(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
batchIDs, err := p.diskStore.ListSnapshotBatches(req.Table, int(req.Shard), redoFileID, redoFileOffset)
if err != nil {
return nil, err
}
batches := make([]*pb.BatchMetaData, len(batchIDs))
for i, batchID := range batchIDs {
columns, err := p.diskStore.ListSnapshotVectorPartyFiles(req.Table, int(req.Shard), redoFileID, redoFileOffset, batchID)
if err != nil {
return nil, err
}
vps := make([]*pb.VectorPartyMetaData, len(columns))
for j, colID := range columns {
vps[j] = &pb.VectorPartyMetaData{
ColumnID: uint32(colID),
}
}
batches[i] = &pb.BatchMetaData{
BatchID: int32(batchID),
Vps: vps[0:],
}
}
m.Batches = batches
m.Meta = &pb.TableShardMetaData_DimensionMeta{
DimensionMeta: &pb.DimensionTableShardMetaData{
LastBatchID: lastBatchID,
LastBatchSize: int32(lastBatchSize),
SnapshotVersion: &pb.SnapshotVersion{
RedoFileID: redoFileID,
RedoFileOffset: redoFileOffset,
},
},
}
return m, nil
}
// fact table
cutoff, err := p.metaStore.GetArchivingCutoff(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
redoFileID, redoFileOffset, err := p.metaStore.GetBackfillProgressInfo(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
// adjust start/end batchID according to local retention setting and request
// we'll take the intersection batches
startBatchID := int32(0)
endBatchID := int32(utils.Now().Unix() / 86400)
if t.Config.RecordRetentionInDays > 0 {
startBatchID = endBatchID - int32(t.Config.RecordRetentionInDays) + 1
}
if req.StartBatchID > startBatchID {
startBatchID = req.StartBatchID
}
if req.EndBatchID > 0 && req.EndBatchID < endBatchID {
endBatchID = req.EndBatchID
}
batchIDs, err := p.metaStore.GetArchiveBatches(req.Table, int(req.Shard), startBatchID, endBatchID)
if err != nil {
return nil, err
}
batches := make([]*pb.BatchMetaData, len(batchIDs))
for i, batchID := range batchIDs {
version, seq, size, err := p.metaStore.GetArchiveBatchVersion(req.Table, int(req.Shard), batchID, cutoff)
if err != nil {
return nil, err
}
columns, err := p.diskStore.ListArchiveBatchVectorPartyFiles(req.Table, int(req.Shard), batchID, version, seq)
if err != nil {
return nil, err
}
vps := make([]*pb.VectorPartyMetaData, len(columns))
for j, colID := range columns {
vps[j] = &pb.VectorPartyMetaData{
ColumnID: uint32(colID),
}
}
batches[i] = &pb.BatchMetaData{
BatchID: int32(batchID),
Size: uint32(size),
ArchiveVersion: &pb.ArchiveVersion{
ArchiveVersion: version,
BackfillSeq: seq,
},
Vps: vps,
}
}
m.Batches = batches
m.Meta = &pb.TableShardMetaData_FactMeta{
FactMeta: &pb.FactTableShardMetaData{
HighWatermark: cutoff,
BackfillCheckpoint: &pb.BackfillCheckpoint{
RedoFileID: redoFileID,
RedoFileOffset: redoFileOffset,
},
},
}
return m, nil
}
func (p *PeerDataNodeServerImpl) FetchVectorPartyRawData(req *pb.VectorPartyRawDataRequest, stream pb.PeerDataNode_FetchVectorPartyRawDataServer) error {
sessionInfo := &sessionInfo{
table: req.Table,
shardID: req.Shard,
nodeID: req.NodeID,
}
var err error
var timeElapsed int64
timeStart := utils.Now()
logInfoMsg(sessionInfo, "FetchVectorPartyRawData called", "batch", req.BatchID, "col", req.ColumnID)
defer func() {
if err == nil {
logInfoMsg(sessionInfo, "FetchVectorPartyRawData succeed", "batch", req.BatchID, "col", req.ColumnID, "timeused", timeElapsed)
} else {
logErrorMsg(sessionInfo, err, "FetchVectorPartyRawData failed", req.BatchID, "col", req.ColumnID)
}
}()
if err = p.validateRequest(req.SessionID, req.NodeID, req.Table, req.Shard); err != nil {
return err
}
t, err := p.metaStore.GetTable(req.Table)
if err != nil {
return err
}
var reader io.ReadCloser
if t.IsFactTable {
reader, err = p.diskStore.OpenVectorPartyFileForRead(req.Table, int(req.ColumnID), int(req.Shard), int(req.BatchID),
uint32(req.GetArchiveVersion().ArchiveVersion), uint32(req.GetArchiveVersion().BackfillSeq))
} else {
reader, err = p.diskStore.OpenSnapshotVectorPartyFileForRead(req.Table, int(req.Shard), int64(req.GetSnapshotVersion().RedoFileID),
req.GetSnapshotVersion().RedoFileOffset, int(req.BatchID), int(req.ColumnID))
}
if err != nil {
return err
}
defer reader.Close()
bufferedReader := bufio.NewReaderSize(reader, bufferSize)
vp := &pb.VectorPartyRawData{}
buf := make([]byte, chunkSize)
for {
n, err := bufferedReader.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
vp.Chunk = buf[:n]
if err = stream.Send(vp); err != nil {
return err
}
}
if err != nil {
// clean the EOF error
err = nil
break
}
}
// in macro second
timeElapsed = utils.Now().Sub(timeStart).Nanoseconds() / 1000
return nil
}
// BenchmarkFileTransfer is used to benchmark testing, we can remove later TODO
func (p *PeerDataNodeServerImpl) BenchmarkFileTransfer(req *pb.BenchmarkRequest, stream pb.PeerDataNode_BenchmarkFileTransferServer) error {
var err error
var timeElapsed int64
timeStart := utils.Now()
reader, err := os.OpenFile(req.File, os.O_RDONLY, 0x644)
if err != nil {
return err
}
vp := &pb.VectorPartyRawData{}
bufSize := req.ChunkSize
if bufSize == 0 {
bufSize = chunkSize
}
var bufferedReader *bufio.Reader
if req.BufferSize > 0 {
bufferedReader = bufio.NewReaderSize(reader, int(req.BufferSize))
}
buf := make([]byte, bufSize)
for {
var n int
var err error
if bufferedReader == nil {
n, err = reader.Read(buf)
} else {
n, err = bufferedReader.Read(buf)
}
if err != nil && err != io.EOF {
return err
}
if n > 0 {
vp.Chunk = buf[:n]
if err = stream.Send(vp); err != nil {
return err
}
}
if err != nil {
break
}
}
// in macro second
timeElapsed = utils.Now().Sub(timeStart).Nanoseconds() / 1000
if err != nil {
}
utils.GetLogger().With("timeelapsed", timeElapsed).Info("BenchmarkFileTransfer")
return nil
}
func (p *PeerDataNodeServerImpl) validateSessionSource(sessionID int64, nodeID string) error {
if sessionID == 0 {
return errNoSessionID
}
if len(nodeID) == 0 {
return errNoCallerID
}
sessionInfo, err := p.getSession(sessionID)
if err != nil {
return err
}
if sessionInfo.nodeID != nodeID {
return errInvalidRequset
}
return nil
}
func (p *PeerDataNodeServerImpl) validateRequest(sessionID int64, nodeID string, table string, shard uint32) error {
if sessionID == 0 {
return errNoSessionID
}
if len(nodeID) == 0 {
return errNoCallerID
}
sessionInfo, err := p.getSession(sessionID)
if err != nil {
return err
}
if sessionInfo.nodeID != nodeID {
return errInvalidRequset
}
if sessionInfo.table != table || sessionInfo.shardID != shard {
return errInvalidRequset
}
return nil
}
// record new requested session
func (p *PeerDataNodeServerImpl) addSession(session *sessionInfo) {
p.Lock()
defer p.Unlock()
p.sessions[session.sessionID] = session
tableShard := tableShardPair{
table: session.table,
shardID: session.shardID,
}
if _, ok := p.tableShardSessions[tableShard]; !ok {
p.tableShardSessions[tableShard] = []int64{}
}
p.tableShardSessions[tableShard] = append(p.tableShardSessions[tableShard], session.sessionID)
}
// closeSession remove session from memory
func (p *PeerDataNodeServerImpl) cleanSession(sessionID int64, needLock bool) {
if needLock {
p.Lock()
defer p.Unlock()
}
session := p.sessions[sessionID]
if session != nil {
delete(p.sessions, sessionID)
tableShardKey := tableShardPair{table: session.table, shardID: session.shardID}
sessionIDs := p.tableShardSessions[tableShardKey]
for i, sid := range sessionIDs {
if sid == sessionID {
sessionIDs = append(sessionIDs[:i], sessionIDs[i+1:]...)
p.tableShardSessions[tableShardKey] = sessionIDs
break
}
}
}
}
// retrieve session info using session id
func (p *PeerDataNodeServerImpl) getSession(sessionID int64) (*sessionInfo, error) {
p.RLock()
defer p.RUnlock()
session, ok := p.sessions[sessionID]
if !ok {
return nil, errInvalidSessionID
}
return session, nil
}
// check if request table/shard is valid
func (p *PeerDataNodeServerImpl) validateTable(tableName string, shardID uint32) error {
// check if table exists
if _, err := p.metaStore.GetTable(tableName); err != nil {
return err
}
// TODO check table shard ownership from topology
return nil
}
func logInfoMsg(s *sessionInfo, msg string, fields ...interface{}) {
f := []interface{}{
"action",
"bootstrap",
"table",
s.table,
"shard",
s.shardID,
}
f = append(f, fields...)
utils.GetLogger().With(f...).Info(msg)
}
func logErrorMsg(s *sessionInfo, err error, msg string, fields ...interface{}) {
f := []interface{}{
"action",
"bootstrap",
"table",
s.table,
"shard",
s.shardID,
"error",
err,
}
f = append(f, fields...)
utils.GetLogger().With(f...).Error(msg)
}