func()

in memstore/bootstrap.go [235:356]


func (shard *TableShard) fetchDataFromPeer(
	peerID string,
	client rpc.PeerDataNodeClient,
	origin string,
	options bootstrap.Options,
) error {

	sessionID, doneFn, err := shard.startStreamSession(peerID, client, origin, options)
	if err != nil {
		return err
	}
	defer doneFn()

	// 1. fetch meta data
	tableShardMeta, err := shard.fetchBatchMetaDataFromPeer(origin, sessionID, client)
	if err != nil {
		return err
	}

	// 2. set metadata and trigger recovery
	if err := shard.setTableShardMetadata(tableShardMeta); err != nil {
		return err
	}

	// 3. fetch raw vps
	workerPool := xsync.NewWorkerPool(options.MaxConcurrentStreamsPerTableShards())
	workerPool.Init()

	retrier := xretry.NewRetrier(xretry.NewOptions().SetMaxRetries(3))
	var (
		mutex  sync.Mutex
		errors xerrors.MultiError
		wg     sync.WaitGroup
	)

	for _, batchMeta := range tableShardMeta.Batches {
		err := shard.setBatchMetadata(tableShardMeta, batchMeta)
		if err != nil {
			return utils.StackError(err, "failed to set batch level metadata")
		}

		for _, vpMeta := range batchMeta.Vps {
			shard.BootstrapDetails.AddVPToCopy(batchMeta.GetBatchID(), vpMeta.GetColumnID())
		}
	}

	fetchStart := utils.Now()
	for _, batchMeta := range tableShardMeta.Batches {
		for _, vpMeta := range batchMeta.Vps {
			// capture batchMeta and vpMeta
			batchMeta := batchMeta
			vpMeta := vpMeta
			wg.Add(1)
			// TODO: add checksum to vp file and vpMeta to avoid copying existing data on disk
			workerPool.Go(func() {
				defer wg.Done()
				attempts := 0
				err = retrier.Attempt(func() error {
					attempts++
					request, vpWriter, err := shard.createVectorPartyRawDataRequest(origin, sessionID, tableShardMeta, batchMeta, vpMeta)
					if err != nil {
						utils.GetLogger().
							With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
							Errorf("failed to create vector party raw data request, attempt %d", attempts)
						return err
					}
					defer vpWriter.Close()

					fetchStart := utils.Now()
					bytesFetched, err := shard.fetchVectorPartyRawDataFromPeer(client, vpWriter, request)
					if err != nil {
						utils.GetLogger().
							With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request, "error", err.Error()).
							Errorf("failed to fetch data from peer, attempt %d", attempts)
						return err
					}

					duration := utils.Now().Sub(fetchStart)
					utils.GetLogger().
						With("peer", peerID, "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "batch", batchMeta.GetBatchID(), "column", vpMeta.GetColumnID(), "request", request).
						Infof("successfully fetched data (%d bytes) from peer, took %f seconds, attempt %d", bytesFetched, duration.Seconds(), attempts)

					utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
						GetChildTimer(map[string]string{
							"batch":  strconv.Itoa(int(batchMeta.GetBatchID())),
							"column": strconv.Itoa(int(vpMeta.GetColumnID())),
						}, utils.RawVPFetchTime).Record(duration)

					utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetChildCounter(
						map[string]string{
							"batch":  strconv.Itoa(int(batchMeta.GetBatchID())),
							"column": strconv.Itoa(int(vpMeta.GetColumnID())),
						}, utils.RawVPBytesFetched).Inc(int64(bytesFetched))

					utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).
						GetChildGauge(map[string]string{
							"batch":  strconv.Itoa(int(batchMeta.GetBatchID())),
							"column": strconv.Itoa(int(vpMeta.GetColumnID())),
						}, utils.RawVPFetchBytesPerSec).Update(float64(bytesFetched) / duration.Seconds())

					shard.BootstrapDetails.MarkVPFinished(batchMeta.GetBatchID(), vpMeta.GetColumnID())
					return nil
				})

				if err != nil {
					mutex.Lock()
					errors = errors.Add(err)
					mutex.Unlock()
					utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchFailure).Inc(1)
				} else {
					utils.GetReporter(tableShardMeta.GetTable(), int(tableShardMeta.GetShard())).GetCounter(utils.RawVPFetchSuccess).Inc(1)
				}
			})
		}
	}
	wg.Wait()
	if !errors.Empty() {
		return errors.FinalError()
	}
	utils.GetReporter(tableShardMeta.Table, int(tableShardMeta.Shard)).GetTimer(utils.TotalRawVPFetchTime).Record(utils.Now().Sub(fetchStart))
	return nil
}