func()

in topology.go [181:318]


func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
	dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
	// skipped hosts in a dc
	skipped := make(map[string][]*HostInfo, len(n.dcs))
	// number of replicas per dc
	replicasInDC := make(map[string]int, len(n.dcs))
	// dc -> racks
	seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))

	for _, h := range tokenRing.hosts {
		dc := h.DataCenter()
		rack := h.Rack()

		racks, ok := dcRacks[dc]
		if !ok {
			racks = make(map[string]struct{})
			dcRacks[dc] = racks
		}
		racks[rack] = struct{}{}
	}

	for dc, racks := range dcRacks {
		replicasInDC[dc] = 0
		seenDCRacks[dc] = make(map[string]struct{}, len(racks))
	}

	tokens := tokenRing.tokens
	replicaRing := make(tokenRingReplicas, 0, len(tokens))

	var totalRF int
	for _, rf := range n.dcs {
		totalRF += rf
	}

	for i, th := range tokenRing.tokens {
		if rf := n.dcs[th.host.DataCenter()]; rf == 0 {
			// skip this token since no replica in this datacenter.
			continue
		}

		for k, v := range skipped {
			skipped[k] = v[:0]
		}

		for dc := range n.dcs {
			replicasInDC[dc] = 0
			for rack := range seenDCRacks[dc] {
				delete(seenDCRacks[dc], rack)
			}
		}

		replicas := make([]*HostInfo, 0, totalRF)
		for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ {
			// TODO: ensure we dont add the same host twice
			p := i + j
			if p >= len(tokens) {
				p -= len(tokens)
			}
			h := tokens[p].host

			dc := h.DataCenter()
			rack := h.Rack()

			rf := n.dcs[dc]
			if rf == 0 {
				// skip this DC, dont know about it or replication factor is zero
				continue
			} else if replicasInDC[dc] >= rf {
				if replicasInDC[dc] > rf {
					panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
				}

				// have enough replicas in this DC
				continue
			} else if _, ok := dcRacks[dc][rack]; !ok {
				// dont know about this rack
				continue
			}

			racks := seenDCRacks[dc]
			if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
				// we have been through all the racks and dont have RF yet, add this
				replicas = append(replicas, h)
				replicasInDC[dc]++
			} else if !ok {
				if racks == nil {
					racks = make(map[string]struct{}, 1)
					seenDCRacks[dc] = racks
				}

				// new rack
				racks[rack] = struct{}{}
				replicas = append(replicas, h)
				r := replicasInDC[dc] + 1

				if len(racks) == len(dcRacks[dc]) {
					// if we have been through all the racks, drain the rest of the skipped
					// hosts until we have RF. The next iteration will skip in the block
					// above
					skippedHosts := skipped[dc]
					var k int
					for ; k < len(skippedHosts) && r+k < rf; k++ {
						sh := skippedHosts[k]
						replicas = append(replicas, sh)
					}
					r += k
					skipped[dc] = skippedHosts[k:]
				}
				replicasInDC[dc] = r
			} else {
				// already seen this rack, keep hold of this host incase
				// we dont get enough for rf
				skipped[dc] = append(skipped[dc], h)
			}
		}

		if len(replicas) == 0 {
			panic(fmt.Sprintf("no replicas for token: %v", th.token))
		} else if !replicas[0].Equal(th.host) {
			panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
		}

		replicaRing = append(replicaRing, hostTokens{th.token, replicas})
	}

	dcsWithReplicas := 0
	for _, dc := range n.dcs {
		if dc > 0 {
			dcsWithReplicas++
		}
	}

	if dcsWithReplicas == len(dcRacks) && len(replicaRing) != len(tokens) {
		panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
	}

	return replicaRing
}