func()

in datanode/datanode.go [597:708]


func (d *dataNode) assignShardSet(shardSet shard.ShardSet) {
	d.Lock()
	defer d.Unlock()

	// process fact tables first
	d.memStore.RLock()
	factTables := make([]string, 0)
	dimensionTables := make([]string, 0)
	for table, schema := range d.memStore.GetSchemas() {
		if schema.Schema.IsFactTable {
			factTables = append(factTables, table)
		} else {
			dimensionTables = append(dimensionTables, table)
		}
	}
	d.memStore.RUnlock()

	var (
		incoming           = make(map[uint32]m3Shard.Shard, len(shardSet.All()))
		existing           = make(map[uint32]struct{}, len(d.shardSet.AllIDs()))
		removing           []uint32
		adding             []m3Shard.Shard
		initializingShards = 0
		noExistingShards   bool
		noShardLeft        bool
	)

	for _, shard := range shardSet.All() {
		if shard.State() == m3Shard.Initializing {
			initializingShards++
		}
		incoming[shard.ID()] = shard
	}

	for _, shardID := range d.shardSet.AllIDs() {
		existing[shardID] = struct{}{}
	}

	noExistingShards = len(existing) == 0
	noShardLeft = len(incoming) == 0

	for shardID := range existing {
		if _, ok := incoming[shardID]; !ok {
			removing = append(removing, shardID)
		}
	}

	for shardID, shard := range incoming {
		if _, ok := existing[shardID]; !ok {
			adding = append(adding, shard)
		}
	}

	for _, shardID := range removing {
		for _, table := range factTables {
			d.logger.With("table", table, "shard", shardID).Info("removing fact table shard on placement change")
			d.memStore.RemoveTableShard(table, int(shardID))
			if err := d.metaStore.DeleteTableShard(table, int(shardID)); err != nil {
				d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard metadata")
			}
			if err := d.diskStore.DeleteTableShard(table, int(shardID)); err != nil {
				d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard data")
			}
		}
	}

	for _, shard := range adding {
		needPeerCopy := shard.State() == m3Shard.Initializing
		for _, table := range factTables {
			d.logger.With("table", table, "shard", shard.ID(), "state", shard.State()).Info("adding fact table shard on placement change")
			// when needPeerCopy is true, we also need to purge old data before adding new shard
			d.memStore.AddTableShard(table, int(shard.ID()), d.numShardsInCluster, needPeerCopy, needPeerCopy)
		}
	}

	// add/remove dimension tables with the following rules:
	// 1. add dimension tables when first shard is assigned to the data node
	// 2. remove dimension tables when the last shard is removed from the data node
	// 3. copy dimension table data from peer when all assigned new shards are initializing shards
	if noExistingShards && !noShardLeft {
		// only need to copy data from peer when all new shards are initializing shards
		// meaning no available/leaving shards ever owned by this data node
		needPeerCopy := initializingShards > 0 && len(incoming) == initializingShards
		for _, table := range dimensionTables {
			d.logger.With("table", table, "shard", 0).Info("adding dimension table shard on placement change")
			// only copy data from peer for dimension table
			// when from zero shards to all initialing shards
			// when needPeerCopy is true, we also need to purge old data before adding new shard
			d.memStore.AddTableShard(table, 0, 1, needPeerCopy, needPeerCopy)
		}
	}

	if !noExistingShards && noShardLeft {
		for _, table := range dimensionTables {
			d.logger.With("table", table, "shard", 0).Info("removing dimension table shard on placement change")
			d.memStore.RemoveTableShard(table, 0)
			if err := d.metaStore.DeleteTableShard(table, 0); err != nil {
				d.logger.With("table", table, "shard", 0).Error("failed to remove table shard metadata")
			}
			if err := d.diskStore.DeleteTableShard(table, 0); err != nil {
				d.logger.With("table", table, "shard", 0).Error("failed to remove table shard data")
			}
		}
	}
	d.shardSet = shardSet

	go func() {
		if err := d.bootstrapManager.Bootstrap(); err != nil {
			d.logger.With("error", err.Error()).Error("error while bootstrapping")
		}
	}()
}