topology.go (224 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 * Copyright (c) 2016, The Gocql authors, * provided under the BSD-3-Clause License. * See the NOTICE file distributed with this work for additional information. */ package gocql import ( "fmt" "sort" "strconv" "strings" ) type hostTokens struct { // token is end (inclusive) of token range these hosts belong to token token hosts []*HostInfo } // tokenRingReplicas maps token ranges to list of replicas. // The elements in tokenRingReplicas are sorted by token ascending. // The range for a given item in tokenRingReplicas starts after preceding range and ends with the token specified in // token. The end token is part of the range. // The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index). type tokenRingReplicas []hostTokens func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) } func (h tokenRingReplicas) Len() int { return len(h) } func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h tokenRingReplicas) replicasFor(t token) *hostTokens { if len(h) == 0 { return nil } p := sort.Search(len(h), func(i int) bool { return !h[i].token.Less(t) }) if p >= len(h) { // rollover p = 0 } return &h[p] } type placementStrategy interface { replicaMap(tokenRing *tokenRing) tokenRingReplicas replicationFactor(dc string) int } func getReplicationFactorFromOpts(val interface{}) (int, error) { switch v := val.(type) { case int: if v < 0 { return 0, fmt.Errorf("invalid replication_factor %d", v) } return v, nil case string: n, err := strconv.Atoi(v) if err != nil { return 0, fmt.Errorf("invalid replication_factor %q: %v", v, err) } else if n < 0 { return 0, fmt.Errorf("invalid replication_factor %d", n) } return n, nil default: return 0, fmt.Errorf("unknown replication_factor type %T", v) } } func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { switch { case strings.Contains(ks.StrategyClass, "SimpleStrategy"): rf, err := getReplicationFactorFromOpts(ks.StrategyOptions["replication_factor"]) if err != nil { logger.Printf("parse rf for keyspace %q: %v", ks.Name, err) return nil } return &simpleStrategy{rf: rf} case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"): dcs := make(map[string]int) for dc, rf := range ks.StrategyOptions { if dc == "class" { continue } rf, err := getReplicationFactorFromOpts(rf) if err != nil { logger.Println("parse rf for keyspace %q, dc %q: %v", err) // skip DC if the rf is invalid/unsupported, so that we can at least work with other working DCs. continue } dcs[dc] = rf } return &networkTopology{dcs: dcs} case strings.Contains(ks.StrategyClass, "LocalStrategy"): return nil default: logger.Printf("parse rf for keyspace %q: unsupported strategy class: %v", ks.StrategyClass) return nil } } type simpleStrategy struct { rf int } func (s *simpleStrategy) replicationFactor(dc string) int { return s.rf } func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas { tokens := tokenRing.tokens ring := make(tokenRingReplicas, len(tokens)) for i, th := range tokens { replicas := make([]*HostInfo, 0, s.rf) seen := make(map[*HostInfo]bool) for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ { h := tokens[(i+j)%len(tokens)] if !seen[h.host] { replicas = append(replicas, h.host) seen[h.host] = true } } ring[i] = hostTokens{th.token, replicas} } sort.Sort(ring) return ring } type networkTopology struct { dcs map[string]int } func (n *networkTopology) replicationFactor(dc string) int { return n.dcs[dc] } func (n *networkTopology) haveRF(replicaCounts map[string]int) bool { if len(replicaCounts) != len(n.dcs) { return false } for dc, rf := range n.dcs { if rf != replicaCounts[dc] { return false } } return true } func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas { dcRacks := make(map[string]map[string]struct{}, len(n.dcs)) // skipped hosts in a dc skipped := make(map[string][]*HostInfo, len(n.dcs)) // number of replicas per dc replicasInDC := make(map[string]int, len(n.dcs)) // dc -> racks seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs)) for _, h := range tokenRing.hosts { dc := h.DataCenter() rack := h.Rack() racks, ok := dcRacks[dc] if !ok { racks = make(map[string]struct{}) dcRacks[dc] = racks } racks[rack] = struct{}{} } for dc, racks := range dcRacks { replicasInDC[dc] = 0 seenDCRacks[dc] = make(map[string]struct{}, len(racks)) } tokens := tokenRing.tokens replicaRing := make(tokenRingReplicas, 0, len(tokens)) var totalRF int for _, rf := range n.dcs { totalRF += rf } for i, th := range tokenRing.tokens { if rf := n.dcs[th.host.DataCenter()]; rf == 0 { // skip this token since no replica in this datacenter. continue } for k, v := range skipped { skipped[k] = v[:0] } for dc := range n.dcs { replicasInDC[dc] = 0 for rack := range seenDCRacks[dc] { delete(seenDCRacks[dc], rack) } } replicas := make([]*HostInfo, 0, totalRF) for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ { // TODO: ensure we dont add the same host twice p := i + j if p >= len(tokens) { p -= len(tokens) } h := tokens[p].host dc := h.DataCenter() rack := h.Rack() rf := n.dcs[dc] if rf == 0 { // skip this DC, dont know about it or replication factor is zero continue } else if replicasInDC[dc] >= rf { if replicasInDC[dc] > rf { panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc)) } // have enough replicas in this DC continue } else if _, ok := dcRacks[dc][rack]; !ok { // dont know about this rack continue } racks := seenDCRacks[dc] if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) { // we have been through all the racks and dont have RF yet, add this replicas = append(replicas, h) replicasInDC[dc]++ } else if !ok { if racks == nil { racks = make(map[string]struct{}, 1) seenDCRacks[dc] = racks } // new rack racks[rack] = struct{}{} replicas = append(replicas, h) r := replicasInDC[dc] + 1 if len(racks) == len(dcRacks[dc]) { // if we have been through all the racks, drain the rest of the skipped // hosts until we have RF. The next iteration will skip in the block // above skippedHosts := skipped[dc] var k int for ; k < len(skippedHosts) && r+k < rf; k++ { sh := skippedHosts[k] replicas = append(replicas, sh) } r += k skipped[dc] = skippedHosts[k:] } replicasInDC[dc] = r } else { // already seen this rack, keep hold of this host incase // we dont get enough for rf skipped[dc] = append(skipped[dc], h) } } if len(replicas) == 0 { panic(fmt.Sprintf("no replicas for token: %v", th.token)) } else if !replicas[0].Equal(th.host) { panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress())) } replicaRing = append(replicaRing, hostTokens{th.token, replicas}) } dcsWithReplicas := 0 for _, dc := range n.dcs { if dc > 0 { dcsWithReplicas++ } } if dcsWithReplicas == len(dcRacks) && len(replicaRing) != len(tokens) { panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens))) } return replicaRing }