func()

in memstore/bootstrap.go [107:226]


func (shard *TableShard) Bootstrap(
	peerSource client.PeerSource,
	origin string,
	topo topology.Topology,
	topoState *topology.StateSnapshot,
	options bootstrap.Options,
) error {
	shard.bootstrapLock.Lock()
	// check whether shard is already bootstrapping
	if shard.BootstrapState == bootstrap.Bootstrapping {
		shard.bootstrapLock.Unlock()
		return bootstrap.ErrTableShardIsBootstrapping
	}
	shard.BootstrapState = bootstrap.Bootstrapping
	shard.bootstrapLock.Unlock()

	shard.Schema.RLock()
	numColumns := len(shard.Schema.GetValueTypeByColumn())
	schema := shard.Schema.Schema
	shard.Schema.RUnlock()

	shard.BootstrapDetails.Clear()
	shard.BootstrapDetails.SetNumColumns(numColumns)
	success := false
	defer func() {
		shard.bootstrapLock.Lock()
		if success {
			shard.BootstrapState = bootstrap.Bootstrapped
		} else {
			shard.BootstrapState = bootstrap.BootstrapNotStarted
		}
		shard.bootstrapLock.Unlock()
	}()

	if atomic.LoadUint32(&shard.needPeerCopy) == 1 {
		shard.BootstrapDetails.SetBootstrapStage(bootstrap.PeerCopy)
		// find peer node for copy metadata and raw data
		peerNodes, err := shard.findBootstrapSource(origin, topo, topoState)
		if err != nil {
			utils.GetLogger().
				With("table", shard.Schema.Schema.Name).
				With("shardID", shard.ShardID).
				With("origin", origin).
				With("error", err.Error()).
				Error("failed to find peers for shard bootstrap")
			return err
		}

		// Note: skip peer copy step when we found no peers. this is correct based on the assumption that
		// if a shard replica does not find any peer in available/leaving state
		// then the cluster should be in the initial phase of new placement created
		// when we do replace/remove/add, the total number of shard replica remains the same
		// so the number of initializing replica should match the number of leaving replica
		// when we increase the number of replicas, we should always find existing available replica
		if len(peerNodes) == 0 {
			utils.GetLogger().
				With("table", shard.Schema.Schema.Name).
				With("shardID", shard.ShardID).
				With("origin", origin).
				Info("no available/leaving source, new placement")
		} else {
			utils.GetLogger().
				With("table", shard.Schema.Schema.Name).
				With("shardID", shard.ShardID).
				With("peers", peerNodes).
				Info("found peer to bootstrap from")

			// shuffle peer nodes randomly
			rand.New(rand.NewSource(utils.Now().Unix())).Shuffle(len(peerNodes), func(i, j int) {
				peerNodes[i], peerNodes[j] = peerNodes[j], peerNodes[i]
			})

			var dataStreamErr error
			borrowErr := peerSource.BorrowConnection(peerNodes, func(peerID string, nodeClient rpc.PeerDataNodeClient) {
				shard.BootstrapDetails.SetSource(peerID)
				dataStreamErr = shard.fetchDataFromPeer(peerID, nodeClient, origin, options)
			})

			if borrowErr != nil {
				return borrowErr
			}
			if dataStreamErr != nil {
				return dataStreamErr
			}
		}
		atomic.StoreUint32(&shard.needPeerCopy, 0)
	}

	// load metadata from disk
	err := shard.LoadMetaData()
	if err != nil {
		return err
	}

	shard.BootstrapDetails.SetBootstrapStage(bootstrap.Preload)
	// preload snapshot or archive batches into memory
	if schema.IsFactTable {
		// preload all columns for fact table
		endDay := int(utils.Now().Unix() / 86400)
		for columnID, column := range schema.Columns {
			if column.Deleted {
				continue
			}
			shard.PreloadColumn(columnID, endDay-column.Config.PreloadingDays, endDay)
		}
	} else {
		// preload snapshot for dimension table
		err = shard.LoadSnapshot()
		if err != nil {
			return err
		}
	}

	shard.BootstrapDetails.SetBootstrapStage(bootstrap.Recovery)
	// start play redolog
	shard.PlayRedoLog()
	success = true
	shard.BootstrapDetails.SetBootstrapStage(bootstrap.Finished)
	return nil
}