in datanode/datanode.go [597:708]
func (d *dataNode) assignShardSet(shardSet shard.ShardSet) {
d.Lock()
defer d.Unlock()
// process fact tables first
d.memStore.RLock()
factTables := make([]string, 0)
dimensionTables := make([]string, 0)
for table, schema := range d.memStore.GetSchemas() {
if schema.Schema.IsFactTable {
factTables = append(factTables, table)
} else {
dimensionTables = append(dimensionTables, table)
}
}
d.memStore.RUnlock()
var (
incoming = make(map[uint32]m3Shard.Shard, len(shardSet.All()))
existing = make(map[uint32]struct{}, len(d.shardSet.AllIDs()))
removing []uint32
adding []m3Shard.Shard
initializingShards = 0
noExistingShards bool
noShardLeft bool
)
for _, shard := range shardSet.All() {
if shard.State() == m3Shard.Initializing {
initializingShards++
}
incoming[shard.ID()] = shard
}
for _, shardID := range d.shardSet.AllIDs() {
existing[shardID] = struct{}{}
}
noExistingShards = len(existing) == 0
noShardLeft = len(incoming) == 0
for shardID := range existing {
if _, ok := incoming[shardID]; !ok {
removing = append(removing, shardID)
}
}
for shardID, shard := range incoming {
if _, ok := existing[shardID]; !ok {
adding = append(adding, shard)
}
}
for _, shardID := range removing {
for _, table := range factTables {
d.logger.With("table", table, "shard", shardID).Info("removing fact table shard on placement change")
d.memStore.RemoveTableShard(table, int(shardID))
if err := d.metaStore.DeleteTableShard(table, int(shardID)); err != nil {
d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard metadata")
}
if err := d.diskStore.DeleteTableShard(table, int(shardID)); err != nil {
d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard data")
}
}
}
for _, shard := range adding {
needPeerCopy := shard.State() == m3Shard.Initializing
for _, table := range factTables {
d.logger.With("table", table, "shard", shard.ID(), "state", shard.State()).Info("adding fact table shard on placement change")
// when needPeerCopy is true, we also need to purge old data before adding new shard
d.memStore.AddTableShard(table, int(shard.ID()), d.numShardsInCluster, needPeerCopy, needPeerCopy)
}
}
// add/remove dimension tables with the following rules:
// 1. add dimension tables when first shard is assigned to the data node
// 2. remove dimension tables when the last shard is removed from the data node
// 3. copy dimension table data from peer when all assigned new shards are initializing shards
if noExistingShards && !noShardLeft {
// only need to copy data from peer when all new shards are initializing shards
// meaning no available/leaving shards ever owned by this data node
needPeerCopy := initializingShards > 0 && len(incoming) == initializingShards
for _, table := range dimensionTables {
d.logger.With("table", table, "shard", 0).Info("adding dimension table shard on placement change")
// only copy data from peer for dimension table
// when from zero shards to all initialing shards
// when needPeerCopy is true, we also need to purge old data before adding new shard
d.memStore.AddTableShard(table, 0, 1, needPeerCopy, needPeerCopy)
}
}
if !noExistingShards && noShardLeft {
for _, table := range dimensionTables {
d.logger.With("table", table, "shard", 0).Info("removing dimension table shard on placement change")
d.memStore.RemoveTableShard(table, 0)
if err := d.metaStore.DeleteTableShard(table, 0); err != nil {
d.logger.With("table", table, "shard", 0).Error("failed to remove table shard metadata")
}
if err := d.diskStore.DeleteTableShard(table, 0); err != nil {
d.logger.With("table", table, "shard", 0).Error("failed to remove table shard data")
}
}
}
d.shardSet = shardSet
go func() {
if err := d.bootstrapManager.Bootstrap(); err != nil {
d.logger.With("error", err.Error()).Error("error while bootstrapping")
}
}()
}