in datanode/bootstrap/bootstrap_server.go [233:391]
func (p *PeerDataNodeServerImpl) FetchTableShardMetaData(ctx context.Context, req *pb.TableShardMetaDataRequest) (*pb.TableShardMetaData, error) {
sessionInfo := &sessionInfo{
table: req.Table,
shardID: req.Shard,
nodeID: req.NodeID,
}
var err error
logInfoMsg(sessionInfo, "FetchTableShardMetaData called")
defer func() {
if err == nil {
logInfoMsg(sessionInfo, "FetchTableShardMetaData succeed")
} else {
logErrorMsg(sessionInfo, err, "FetchTableShardMetaData failed")
}
}()
if err = p.validateRequest(req.SessionID, req.NodeID, req.Table, req.Shard); err != nil {
return nil, err
}
t, err := p.metaStore.GetTable(req.Table)
if err != nil {
return nil, err
}
commitOffset, err := p.metaStore.GetRedoLogCommitOffset(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
checkpointOffset, err := p.metaStore.GetRedoLogCheckpointOffset(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
m := &pb.TableShardMetaData{
Table: req.Table,
Shard: req.Shard,
Incarnation: int32(t.Incarnation),
KafkaOffset: &pb.KafkaOffset{
CommitOffset: commitOffset,
CheckPointOffset: checkpointOffset,
},
}
if !t.IsFactTable {
// dimension table
redoFileID, redoFileOffset, lastBatchID, lastBatchSize, err := p.metaStore.GetSnapshotProgress(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
batchIDs, err := p.diskStore.ListSnapshotBatches(req.Table, int(req.Shard), redoFileID, redoFileOffset)
if err != nil {
return nil, err
}
batches := make([]*pb.BatchMetaData, len(batchIDs))
for i, batchID := range batchIDs {
columns, err := p.diskStore.ListSnapshotVectorPartyFiles(req.Table, int(req.Shard), redoFileID, redoFileOffset, batchID)
if err != nil {
return nil, err
}
vps := make([]*pb.VectorPartyMetaData, len(columns))
for j, colID := range columns {
vps[j] = &pb.VectorPartyMetaData{
ColumnID: uint32(colID),
}
}
batches[i] = &pb.BatchMetaData{
BatchID: int32(batchID),
Vps: vps[0:],
}
}
m.Batches = batches
m.Meta = &pb.TableShardMetaData_DimensionMeta{
DimensionMeta: &pb.DimensionTableShardMetaData{
LastBatchID: lastBatchID,
LastBatchSize: int32(lastBatchSize),
SnapshotVersion: &pb.SnapshotVersion{
RedoFileID: redoFileID,
RedoFileOffset: redoFileOffset,
},
},
}
return m, nil
}
// fact table
cutoff, err := p.metaStore.GetArchivingCutoff(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
redoFileID, redoFileOffset, err := p.metaStore.GetBackfillProgressInfo(req.Table, int(req.Shard))
if err != nil {
return nil, err
}
// adjust start/end batchID according to local retention setting and request
// we'll take the intersection batches
startBatchID := int32(0)
endBatchID := int32(utils.Now().Unix() / 86400)
if t.Config.RecordRetentionInDays > 0 {
startBatchID = endBatchID - int32(t.Config.RecordRetentionInDays) + 1
}
if req.StartBatchID > startBatchID {
startBatchID = req.StartBatchID
}
if req.EndBatchID > 0 && req.EndBatchID < endBatchID {
endBatchID = req.EndBatchID
}
batchIDs, err := p.metaStore.GetArchiveBatches(req.Table, int(req.Shard), startBatchID, endBatchID)
if err != nil {
return nil, err
}
batches := make([]*pb.BatchMetaData, len(batchIDs))
for i, batchID := range batchIDs {
version, seq, size, err := p.metaStore.GetArchiveBatchVersion(req.Table, int(req.Shard), batchID, cutoff)
if err != nil {
return nil, err
}
columns, err := p.diskStore.ListArchiveBatchVectorPartyFiles(req.Table, int(req.Shard), batchID, version, seq)
if err != nil {
return nil, err
}
vps := make([]*pb.VectorPartyMetaData, len(columns))
for j, colID := range columns {
vps[j] = &pb.VectorPartyMetaData{
ColumnID: uint32(colID),
}
}
batches[i] = &pb.BatchMetaData{
BatchID: int32(batchID),
Size: uint32(size),
ArchiveVersion: &pb.ArchiveVersion{
ArchiveVersion: version,
BackfillSeq: seq,
},
Vps: vps,
}
}
m.Batches = batches
m.Meta = &pb.TableShardMetaData_FactMeta{
FactMeta: &pb.FactTableShardMetaData{
HighWatermark: cutoff,
BackfillCheckpoint: &pb.BackfillCheckpoint{
RedoFileID: redoFileID,
RedoFileOffset: redoFileOffset,
},
},
}
return m, nil
}