func()

in datanode/bootstrap/bootstrap_server.go [233:391]


func (p *PeerDataNodeServerImpl) FetchTableShardMetaData(ctx context.Context, req *pb.TableShardMetaDataRequest) (*pb.TableShardMetaData, error) {
	sessionInfo := &sessionInfo{
		table:   req.Table,
		shardID: req.Shard,
		nodeID:  req.NodeID,
	}
	var err error

	logInfoMsg(sessionInfo, "FetchTableShardMetaData called")
	defer func() {
		if err == nil {
			logInfoMsg(sessionInfo, "FetchTableShardMetaData succeed")
		} else {
			logErrorMsg(sessionInfo, err, "FetchTableShardMetaData failed")
		}
	}()

	if err = p.validateRequest(req.SessionID, req.NodeID, req.Table, req.Shard); err != nil {
		return nil, err
	}

	t, err := p.metaStore.GetTable(req.Table)
	if err != nil {
		return nil, err
	}

	commitOffset, err := p.metaStore.GetRedoLogCommitOffset(req.Table, int(req.Shard))
	if err != nil {
		return nil, err
	}
	checkpointOffset, err := p.metaStore.GetRedoLogCheckpointOffset(req.Table, int(req.Shard))
	if err != nil {
		return nil, err
	}

	m := &pb.TableShardMetaData{
		Table:       req.Table,
		Shard:       req.Shard,
		Incarnation: int32(t.Incarnation),
		KafkaOffset: &pb.KafkaOffset{
			CommitOffset:     commitOffset,
			CheckPointOffset: checkpointOffset,
		},
	}

	if !t.IsFactTable {
		// dimension table
		redoFileID, redoFileOffset, lastBatchID, lastBatchSize, err := p.metaStore.GetSnapshotProgress(req.Table, int(req.Shard))
		if err != nil {
			return nil, err
		}

		batchIDs, err := p.diskStore.ListSnapshotBatches(req.Table, int(req.Shard), redoFileID, redoFileOffset)
		if err != nil {
			return nil, err
		}

		batches := make([]*pb.BatchMetaData, len(batchIDs))

		for i, batchID := range batchIDs {
			columns, err := p.diskStore.ListSnapshotVectorPartyFiles(req.Table, int(req.Shard), redoFileID, redoFileOffset, batchID)
			if err != nil {
				return nil, err
			}
			vps := make([]*pb.VectorPartyMetaData, len(columns))
			for j, colID := range columns {
				vps[j] = &pb.VectorPartyMetaData{
					ColumnID: uint32(colID),
				}
			}
			batches[i] = &pb.BatchMetaData{
				BatchID: int32(batchID),
				Vps:     vps[0:],
			}
		}
		m.Batches = batches
		m.Meta = &pb.TableShardMetaData_DimensionMeta{
			DimensionMeta: &pb.DimensionTableShardMetaData{
				LastBatchID:   lastBatchID,
				LastBatchSize: int32(lastBatchSize),
				SnapshotVersion: &pb.SnapshotVersion{
					RedoFileID:     redoFileID,
					RedoFileOffset: redoFileOffset,
				},
			},
		}
		return m, nil
	}

	// fact table
	cutoff, err := p.metaStore.GetArchivingCutoff(req.Table, int(req.Shard))
	if err != nil {
		return nil, err
	}
	redoFileID, redoFileOffset, err := p.metaStore.GetBackfillProgressInfo(req.Table, int(req.Shard))
	if err != nil {
		return nil, err
	}

	// adjust start/end batchID according to local retention setting and request
	// we'll take the intersection batches
	startBatchID := int32(0)
	endBatchID := int32(utils.Now().Unix() / 86400)
	if t.Config.RecordRetentionInDays > 0 {
		startBatchID = endBatchID - int32(t.Config.RecordRetentionInDays) + 1
	}
	if req.StartBatchID > startBatchID {
		startBatchID = req.StartBatchID
	}
	if req.EndBatchID > 0 && req.EndBatchID < endBatchID {
		endBatchID = req.EndBatchID
	}

	batchIDs, err := p.metaStore.GetArchiveBatches(req.Table, int(req.Shard), startBatchID, endBatchID)
	if err != nil {
		return nil, err
	}

	batches := make([]*pb.BatchMetaData, len(batchIDs))
	for i, batchID := range batchIDs {
		version, seq, size, err := p.metaStore.GetArchiveBatchVersion(req.Table, int(req.Shard), batchID, cutoff)
		if err != nil {
			return nil, err
		}
		columns, err := p.diskStore.ListArchiveBatchVectorPartyFiles(req.Table, int(req.Shard), batchID, version, seq)
		if err != nil {
			return nil, err
		}

		vps := make([]*pb.VectorPartyMetaData, len(columns))
		for j, colID := range columns {
			vps[j] = &pb.VectorPartyMetaData{
				ColumnID: uint32(colID),
			}
		}
		batches[i] = &pb.BatchMetaData{
			BatchID: int32(batchID),
			Size:    uint32(size),
			ArchiveVersion: &pb.ArchiveVersion{
				ArchiveVersion: version,
				BackfillSeq:    seq,
			},
			Vps: vps,
		}
	}

	m.Batches = batches
	m.Meta = &pb.TableShardMetaData_FactMeta{
		FactMeta: &pb.FactTableShardMetaData{
			HighWatermark: cutoff,
			BackfillCheckpoint: &pb.BackfillCheckpoint{
				RedoFileID:     redoFileID,
				RedoFileOffset: redoFileOffset,
			},
		},
	}

	return m, nil
}