in memstore/bootstrap.go [235:356]
func (shard *TableShard) fetchDataFromPeer(
peerID string,
client rpc.PeerDataNodeClient,
origin string,
options bootstrap.Options,
) error {
sessionID, doneFn, err := shard.startStreamSession(peerID, client, origin, options)
if err != nil {
return err
}
defer doneFn()
// 1. fetch meta data
tableShardMeta, err := shard.fetchBatchMetaDataFromPeer(origin, sessionID, client)
if err != nil {
return err
}
// 2. set metadata and trigger recovery
if err := shard.setTableShardMetadata(tableShardMeta); err != nil {
return err
}
// 3. fetch raw vps
workerPool := xsync.NewWorkerPool(options.MaxConcurrentStreamsPerTableShards())
workerPool.Init()
retrier := xretry.NewRetrier(xretry.NewOptions().SetMaxRetries(3))
var (
mutex sync.Mutex
errors xerrors.MultiError
wg sync.WaitGroup
)
for _, batchMeta := range tableShardMeta.Batches {
err := shard.setBatchMetadata(tableShardMeta, batchMeta)
if err != nil {
return utils.StackError(err, "failed to set batch level metadata")
}
for _, vpMeta := range batchMeta.Vps {
shard.BootstrapDetails.AddVPToCopy(batchMeta.GetBatchID(), vpMeta.GetColumnID())
}
}
fetchStart := utils.Now()
for _, batchMeta := range tableShardMeta.Batches {
for _, vpMeta := range batchMeta.Vps {
// capture batchMeta and vpMeta
batchMeta := batchMeta
vpMeta := vpMeta
wg.Add(1)
// TODO: add checksum to vp file and vpMeta to avoid copying existing data on disk
workerPool.Go(func() {
defer wg.Done()
attempts := 0
err = retrier.Attempt(func() error {
attempts++
request, vpWriter, err := shard.createVectorPartyRawDataRequest(origin, sessionID, tableShardMeta, batchMeta, vpMeta)
if err != nil {
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
Errorf("failed to create vector party raw data request, attempt %d", attempts)
return err
}
defer vpWriter.Close()
fetchStart := utils.Now()
bytesFetched, err := shard.fetchVectorPartyRawDataFromPeer(client, vpWriter, request)
if err != nil {
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
Errorf("failed to fetch data from peer, attempt %d", attempts)
return err
}
duration := utils.Now().Sub(fetchStart)
utils.GetLogger().
With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request).
Infof("successfully fetched data (%d bytes) from peer, took %f seconds, attempt %d", bytesFetched, duration.Seconds(), attempts)
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
GetChildTimer(map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPFetchTime).Record(duration)
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetChildCounter(
map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPBytesFetched).Inc(int64(bytesFetched))
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
GetChildGauge(map[string]string{
"batch": strconv.Itoa(int(batchMeta.GetBatchID())),
"column": strconv.Itoa(int(vpMeta.GetColumnID())),
}, utils.RawVPFetchBytesPerSec).Update(float64(bytesFetched) / duration.Seconds())
shard.BootstrapDetails.MarkVPFinished(batchMeta.GetBatchID(), vpMeta.GetColumnID())
return nil
})
if err != nil {
mutex.Lock()
errors = errors.Add(err)
mutex.Unlock()
utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchFailure).Inc(1)
} else {
utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchSuccess).Inc(1)
}
})
}
}
wg.Wait()
if !errors.Empty() {
return errors.FinalError()
}
utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetTimer(utils.TotalRawVPFetchTime).Record(utils.Now().Sub(fetchStart))
return nil
}