datanode/datanode.go (596 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"fmt"
"github.com/uber/aresdb/common"
"net/http"
"net/http/pprof"
"path/filepath"
"sync"
"time"
"github.com/m3db/m3/src/x/instrument"
controllerCli "github.com/uber/aresdb/controller/client"
"strings"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/services"
m3Shard "github.com/m3db/m3/src/cluster/shard"
"github.com/uber-go/tally"
"github.com/uber/aresdb/api"
"github.com/uber/aresdb/cluster/shard"
"github.com/uber/aresdb/cluster/topology"
mutatorsCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/datanode/bootstrap"
"github.com/uber/aresdb/datanode/generated/proto/rpc"
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/memstore"
memCom "github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/metastore"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/redolog"
"github.com/uber/aresdb/utils"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// dataNode includes metastore, memstore and diskstore
type dataNode struct {
sync.RWMutex
hostID string
startedAt time.Time
shardSet shard.ShardSet
clusterServices services.Services
topoInitializer topology.Initializer
topology topology.Topology
numShardsInCluster int
enumReader mutatorsCom.EnumReader
metaStore metaCom.MetaStore
memStore memstore.MemStore
diskStore diskstore.DiskStore
opts Options
logger common.Logger
metrics datanodeMetrics
handlers datanodeHandlers
bootstrapManager BootstrapManager
redoLogManagerMaster *redolog.RedoLogManagerMaster
grpcServer *grpc.Server
mapWatch topology.MapWatch
close chan struct{}
readyCh chan struct{}
}
type datanodeHandlers struct {
schemaHandler *api.SchemaHandler
enumHandler *api.EnumHandler
queryHandler *api.QueryHandler
dataHandler *api.DataHandler
nodeModuleHandler http.Handler
debugStaticHandler http.Handler
debugHandler *api.DebugHandler
healthCheckHandler *api.HealthCheckHandler
swaggerHandler http.Handler
}
type datanodeMetrics struct {
restartTimer tally.Timer
}
// NewDataNode creates a new data node
func NewDataNode(
hostID string,
topoInitializer topology.Initializer,
enumReader mutatorsCom.EnumReader,
opts Options) (DataNode, error) {
iOpts := opts.InstrumentOptions()
logger := iOpts.Logger().With("datanode", hostID)
scope := iOpts.MetricsScope().SubScope("namespace").
Tagged(map[string]string{
"datanode": hostID,
})
metaStore, err := metastore.NewDiskMetaStore(filepath.Join(opts.ServerConfig().RootPath, "metastore"))
if err != nil {
return nil, utils.StackError(err, "failed to initialize local metastore")
}
diskStore := diskstore.NewLocalDiskStore(opts.ServerConfig().RootPath)
bootstrapServer := bootstrap.NewPeerDataNodeServer(metaStore, diskStore)
bootstrapToken := bootstrapServer.(memCom.BootStrapToken)
redologCfg := opts.ServerConfig().RedoLogConfig
redoLogManagerMaster, err := redolog.NewRedoLogManagerMaster(opts.ServerConfig().Cluster.Namespace, &redologCfg, diskStore, metaStore)
if err != nil {
return nil, utils.StackError(err, "failed to initialize redolog manager master")
}
memStore := memstore.NewMemStore(metaStore, diskStore,
memstore.NewOptions(bootstrapToken, redoLogManagerMaster))
grpcServer := grpc.NewServer()
rpc.RegisterPeerDataNodeServer(grpcServer, bootstrapServer)
reflection.Register(grpcServer)
d := &dataNode{
hostID: hostID,
topoInitializer: topoInitializer,
enumReader: enumReader,
metaStore: metaStore,
memStore: memStore,
diskStore: diskStore,
opts: opts,
logger: logger,
metrics: newDatanodeMetrics(scope),
grpcServer: grpcServer,
redoLogManagerMaster: redoLogManagerMaster,
shardSet: shard.NewShardSet(nil),
close: make(chan struct{}),
readyCh: make(chan struct{}),
}
d.handlers = d.newHandlers()
clusterClient, err := d.opts.ServerConfig().Cluster.Etcd.NewClient(instrument.NewOptions())
if err != nil {
return nil, utils.StackError(err, "failed to create etcd client")
}
d.clusterServices, err = clusterClient.Services(nil)
if err != nil {
return nil, utils.StackError(err, "failed to create cluster services client")
}
return d, nil
}
// Open data node for serving
func (d *dataNode) Open() error {
d.startedAt = utils.Now()
//1. start schema watch
d.startSchemaWatch()
// memstore fetch local disk schema
err := d.memStore.FetchSchema()
if err != nil {
return err
}
// 2. start debug server
go d.startDebugServer()
// initialize topology, block wait for first topology in etcd
d.topology, err = d.topoInitializer.Init()
if err != nil {
return utils.StackError(err, "failed to initialize topology")
}
d.bootstrapManager = NewBootstrapManager(d.hostID, d.memStore, d.opts.BootstrapOptions(), d.topology)
// 3. first shard assignment
d.mapWatch, err = d.topology.Watch()
if err != nil {
return utils.StackError(err, "failed to watch topology")
}
select {
case <-d.mapWatch.C():
topoMap := d.mapWatch.Get()
d.numShardsInCluster = len(topoMap.ShardSet().AllIDs())
hostShardSet, ok := topoMap.LookupHostShardSet(d.hostID)
if ok {
d.assignShardSet(hostShardSet.ShardSet())
}
default:
}
d.memStore.GetHostMemoryManager().Start()
// 5. start scheduler
if !d.opts.ServerConfig().SchedulerOff {
d.logger.Info("starting scheduler")
// disable archiving during redolog replay
d.memStore.GetScheduler().EnableJobType(memCom.ArchivingJobType, false)
// this will start scheduler of all jobs except archiving, archiving will be started individually
d.memStore.GetScheduler().Start()
} else {
d.logger.Info("scheduler is turned off")
}
// 6. start table addition watch
go d.startTableAdditionWatch()
// 7. start active topology watch
go d.startActiveTopologyWatch()
// 8. start analyzing shard availability
go d.startAnalyzingShardAvailability()
// 9. start analyzing server readiness
go d.startAnalyzingServerReadiness()
// 10. start bootstrap retry watch
go d.startBootstrapRetryWatch()
return nil
}
func (d *dataNode) startSchemaWatch() {
if d.opts.ServerConfig().Cluster.Enable {
// TODO better to reuse the code directly in controller to talk to etcd
if d.opts.ServerConfig().Cluster.Namespace == "" {
d.logger.Fatal("Missing cluster name")
}
controllerClientCfg := d.opts.ServerConfig().Cluster.Controller
if controllerClientCfg == nil {
d.logger.Fatal("Missing controller client config")
}
if d.opts.ServerConfig().Cluster.InstanceID != "" {
controllerClientCfg.Headers.Add(controllerCli.InstanceNameHeaderKey, d.opts.ServerConfig().Cluster.InstanceID)
}
controllerClient := controllerCli.NewControllerHTTPClient(controllerClientCfg.Address, time.Duration(controllerClientCfg.TimeoutSec)*time.Second, controllerClientCfg.Headers)
schemaFetchJob := metastore.NewSchemaFetchJob(30, d.metaStore, nil, metastore.NewTableSchameValidator(), controllerClient, nil, d.opts.ServerConfig().Cluster.Namespace, "")
// immediate initial fetch
schemaFetchJob.FetchSchema()
go schemaFetchJob.Run()
}
}
func (d *dataNode) Close() {
close(d.close)
if d.mapWatch != nil {
d.mapWatch.Close()
d.mapWatch = nil
}
d.grpcServer.Stop()
d.redoLogManagerMaster.Stop()
}
func (d *dataNode) startDebugServer() {
debugRouter := mux.NewRouter()
debugRouter.PathPrefix("/node_modules/").Handler(d.handlers.nodeModuleHandler)
debugRouter.PathPrefix("/static/").Handler(d.handlers.debugStaticHandler)
debugRouter.HandleFunc("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
debugRouter.HandleFunc("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
debugRouter.HandleFunc("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
debugRouter.HandleFunc("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
debugRouter.PathPrefix("/debug/pprof/").Handler(http.HandlerFunc(pprof.Index))
d.handlers.debugHandler.Register(debugRouter.PathPrefix("/dbg").Subrouter())
debugRouter.HandleFunc("/health", utils.ApplyHTTPWrappers(d.handlers.healthCheckHandler.HealthCheck))
d.handlers.schemaHandler.RegisterForDebug(debugRouter.PathPrefix("/schema").Subrouter())
d.opts.InstrumentOptions().Logger().Infof("Starting HTTP server on dbg-port %d", d.opts.ServerConfig().DebugPort)
d.opts.InstrumentOptions().Logger().Fatal(http.ListenAndServe(fmt.Sprintf(":%d", d.opts.ServerConfig().DebugPort), debugRouter))
}
func (d *dataNode) startTableAdditionWatch() {
shardOwnershipEvents, done, err := d.metaStore.WatchShardOwnershipEvents()
if err != nil {
utils.GetLogger().With("error", err.Error()).Fatal("failed to watch schema addition")
}
for {
select {
case <-d.close:
return
case event, ok := <-shardOwnershipEvents:
if !ok {
close(done)
return
}
d.addTable(event.TableName)
done <- struct{}{}
}
}
}
func (d *dataNode) startActiveTopologyWatch() {
for {
select {
case <-d.close:
return
case _, ok := <-d.mapWatch.C():
if !ok {
return
}
topoMap := d.mapWatch.Get()
hostShardSet, ok := topoMap.LookupHostShardSet(d.hostID)
if ok {
d.assignShardSet(hostShardSet.ShardSet())
} else {
// assign empty shard set when host does not appear in placement
d.assignShardSet(shard.NewShardSet(nil))
}
}
}
}
// checkShardReadiness check which of the shards are ready (meaning all tables within the shard are bootstrapped)
// this function will partition the input shards into two parts with all ready shards in the first part
// and return the number of ready shards
// eg. given input shards [0, 1, 2, 3, 4, 5, 6, 7], if all tables in shard 2, 6 are bootstrapped
// when the algorithm is finished,
// the input shards slice will become [2, 6, 0, 1, 3, 4, 5, 7],
// and numReadyShards returned is 2
func (d *dataNode) checkShardReadiness(tables []string, shards []uint32) (numReadyShards int) {
for i := 0; i < len(shards); i++ {
shardID := shards[i]
numTablesBootstrapped := 0
for _, table := range tables {
tableShard, err := d.memStore.GetTableShard(table, int(shardID))
if err != nil {
d.logger.With(
"error", err.Error(),
"table", table,
"shard", shardID).
Error("cannot get table shard")
continue
}
if tableShard.IsBootstrapped() {
numTablesBootstrapped++
}
tableShard.Users.Done()
}
if numTablesBootstrapped == len(tables) {
shards[i], shards[numReadyShards] = shards[numReadyShards], shards[i]
numReadyShards++
}
}
return
}
func (d *dataNode) startAnalyzingServerReadiness() {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
case <-d.close:
return
}
nonInitializing := make([]uint32, 0)
// condition for serving readiness
// 1. no shards owned by server
hostShardSet, ok := d.mapWatch.Get().LookupHostShardSet(d.hostID)
if !ok {
close(d.readyCh)
return
}
for _, s := range hostShardSet.ShardSet().All() {
if s.State() != m3Shard.Initializing {
nonInitializing = append(nonInitializing, s.ID())
}
}
// 2. no nonInitializing (available/leaving) shards waiting for bootstrap
if len(nonInitializing) == 0 {
close(d.readyCh)
return
}
factTables := make([]string, 0)
dimTables := make([]string, 0)
d.memStore.RLock()
for table, schema := range d.memStore.GetSchemas() {
if !schema.Schema.IsFactTable {
dimTables = append(dimTables, table)
} else {
factTables = append(factTables, table)
}
}
d.memStore.RUnlock()
// 3. all nonInitializing (available/leaving) shards are bootstrapped
// and all dimension table shards are bootstrapped (only one shard for dim table)
if d.checkShardReadiness(factTables, nonInitializing) == len(nonInitializing) &&
d.checkShardReadiness(dimTables, []uint32{0}) == 1 {
close(d.readyCh)
return
}
}
}
func (d *dataNode) startAnalyzingShardAvailability() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
case <-d.close:
return
}
hostShardSet, ok := d.mapWatch.Get().LookupHostShardSet(d.hostID)
if !ok {
continue
}
// initializing shards are shards with Initializing State
// nonInitializing shards are shards with Available and Leaving State
// we always bootstrap nonInitializing shards first
// and wait for nonInitializing shards's readiness before serving traffic
initializing := make([]uint32, 0)
for _, s := range hostShardSet.ShardSet().All() {
if s.State() == m3Shard.Initializing {
initializing = append(initializing, s.ID())
}
}
if len(initializing) > 0 {
// snapshot fact tables
factTables := make([]string, 0)
d.memStore.RLock()
for table, schema := range d.memStore.GetSchemas() {
if schema.Schema.IsFactTable {
factTables = append(factTables, table)
}
}
d.memStore.RUnlock()
dynamicTopo, ok := d.topology.(topology.DynamicTopology)
if !ok {
d.logger.Error("cannot mark shard available, topology is not dynamic")
return
}
if numAvailable := d.checkShardReadiness(factTables, initializing); numAvailable > 0 {
availableShards := initializing[:numAvailable]
if err := dynamicTopo.MarkShardsAvailable(d.hostID, availableShards...); err != nil {
d.logger.With(zap.Uint32s("shards", availableShards), "error", err.Error()).
Error("failed to mark shards as available", zap.Error(err))
} else {
d.logger.With(zap.Uint32s("shards", availableShards)).Info("successfully marked shards as available")
}
}
}
}
}
// Options returns the database options.
func (d *dataNode) Options() Options {
return d.opts
}
// ShardSet returns the set of shards currently associated with this datanode.
func (d *dataNode) ShardSet() shard.ShardSet {
return d.shardSet
}
func (d *dataNode) ID() string {
return d.hostID
}
func (d *dataNode) Serve() {
// wait for server is ready to serve
<-d.readyCh
// start advertising to the cluster
d.advertise()
// enable archiving jobs
if !d.opts.ServerConfig().SchedulerOff {
d.memStore.GetScheduler().EnableJobType(memCom.ArchivingJobType, true)
d.opts.InstrumentOptions().Logger().Info("archiving jobs enabled")
}
// start server
router := mux.NewRouter()
metricsLoggingMiddlewareProvider := utils.NewMetricsLoggingMiddleWareProvider(d.opts.InstrumentOptions().MetricsScope(), d.opts.InstrumentOptions().Logger())
httpWrappers := []utils.HTTPHandlerWrapper{metricsLoggingMiddlewareProvider.WithMetrics}
if d.opts.HTTPWrapper() != nil {
httpWrappers = append(httpWrappers, d.opts.HTTPWrapper())
}
schemaRouter := router.PathPrefix("/schema")
if d.opts.ServerConfig().Cluster.Enable {
schemaRouter = schemaRouter.Methods(http.MethodGet)
}
d.handlers.schemaHandler.Register(schemaRouter.Subrouter(), httpWrappers...)
d.handlers.enumHandler.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...)
d.handlers.dataHandler.Register(router.PathPrefix("/data").Subrouter(), httpWrappers...)
d.handlers.queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)
router.PathPrefix("/swagger/").Handler(d.handlers.swaggerHandler)
router.PathPrefix("/node_modules/").Handler(d.handlers.nodeModuleHandler)
router.HandleFunc("/health", utils.ApplyHTTPWrappers(d.handlers.healthCheckHandler.HealthCheck, metricsLoggingMiddlewareProvider.WithMetrics))
router.HandleFunc("/version", d.handlers.healthCheckHandler.Version)
// Support CORS calls.
allowOrigins := handlers.AllowedOrigins([]string{"*"})
allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"})
allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"})
// record time from data node started to actually serving
d.metrics.restartTimer.Record(utils.Now().Sub(d.startedAt))
// start batch reporter
batchStatsReporter := memstore.NewBatchStatsReporter(5*60, d.memStore, d)
go batchStatsReporter.Run()
d.opts.InstrumentOptions().Logger().Infof("Starting HTTP server on port %d with max connection %d", d.opts.ServerConfig().Port, d.opts.ServerConfig().HTTP.MaxConnections)
handler := handlers.CORS(allowOrigins, allowHeaders, allowMethods)(mixedHandler(d.grpcServer, router))
if d.opts.Middleware() != nil {
handler = d.opts.Middleware()(handler)
}
utils.LimitServe(d.opts.ServerConfig().Port, handler, d.opts.ServerConfig().HTTP)
}
func (d *dataNode) advertise() {
serviceID := services.NewServiceID().
SetEnvironment(d.opts.ServerConfig().Cluster.Etcd.Env).
SetZone(d.opts.ServerConfig().Cluster.Etcd.Zone).
SetName(utils.DataNodeServiceName(d.opts.ServerConfig().Cluster.Namespace))
err := d.clusterServices.SetMetadata(serviceID, services.NewMetadata().
SetHeartbeatInterval(time.Duration(d.opts.ServerConfig().Cluster.HeartbeatConfig.Interval)*time.Second).
SetLivenessInterval(time.Duration(d.opts.ServerConfig().Cluster.HeartbeatConfig.Timeout)*time.Second))
if err != nil {
d.logger.With("error", err.Error()).Fatalf("failed to set heart beat metadata")
}
placementInstance := placement.NewInstance().SetID(d.hostID)
ad := services.NewAdvertisement().
SetServiceID(serviceID).
SetPlacementInstance(placementInstance)
err = d.clusterServices.Advertise(ad)
if err != nil {
d.logger.With("error", err.Error()).Fatalf("failed to advertise data node")
} else {
d.logger.Info("start advertising datanode to cluster")
}
}
func (d *dataNode) addTable(table string) {
d.Lock()
defer d.Unlock()
d.memStore.RLock()
schema := d.memStore.GetSchemas()[table]
d.memStore.RUnlock()
if schema == nil {
d.logger.With("table", table).Error("schema does not exist")
return
}
isFactTable := schema.Schema.IsFactTable
if !isFactTable {
d.logger.With("table", table, "shard", 0).Info("adding table shard on schema addition")
// dimension table defaults shard to zero
// new table does not need to copy data from peer, but need to purge old data
d.memStore.AddTableShard(table, 0, 1, false, true)
} else {
for _, shardID := range d.shardSet.AllIDs() {
d.logger.With("table", table, "shard", shardID).Info("adding table shard on schema addition")
// new table does not need to copy data from peer, but need to purge old data
d.memStore.AddTableShard(table, int(shardID), d.numShardsInCluster, false, true)
}
}
go func() {
if err := d.bootstrapManager.Bootstrap(); err != nil {
d.logger.With("error", err.Error()).Error("error while bootstrapping")
}
}()
}
func (d *dataNode) assignShardSet(shardSet shard.ShardSet) {
d.Lock()
defer d.Unlock()
// process fact tables first
d.memStore.RLock()
factTables := make([]string, 0)
dimensionTables := make([]string, 0)
for table, schema := range d.memStore.GetSchemas() {
if schema.Schema.IsFactTable {
factTables = append(factTables, table)
} else {
dimensionTables = append(dimensionTables, table)
}
}
d.memStore.RUnlock()
var (
incoming = make(map[uint32]m3Shard.Shard, len(shardSet.All()))
existing = make(map[uint32]struct{}, len(d.shardSet.AllIDs()))
removing []uint32
adding []m3Shard.Shard
initializingShards = 0
noExistingShards bool
noShardLeft bool
)
for _, shard := range shardSet.All() {
if shard.State() == m3Shard.Initializing {
initializingShards++
}
incoming[shard.ID()] = shard
}
for _, shardID := range d.shardSet.AllIDs() {
existing[shardID] = struct{}{}
}
noExistingShards = len(existing) == 0
noShardLeft = len(incoming) == 0
for shardID := range existing {
if _, ok := incoming[shardID]; !ok {
removing = append(removing, shardID)
}
}
for shardID, shard := range incoming {
if _, ok := existing[shardID]; !ok {
adding = append(adding, shard)
}
}
for _, shardID := range removing {
for _, table := range factTables {
d.logger.With("table", table, "shard", shardID).Info("removing fact table shard on placement change")
d.memStore.RemoveTableShard(table, int(shardID))
if err := d.metaStore.DeleteTableShard(table, int(shardID)); err != nil {
d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard metadata")
}
if err := d.diskStore.DeleteTableShard(table, int(shardID)); err != nil {
d.logger.With("table", table, "shard", shardID).Error("failed to remove table shard data")
}
}
}
for _, shard := range adding {
needPeerCopy := shard.State() == m3Shard.Initializing
for _, table := range factTables {
d.logger.With("table", table, "shard", shard.ID(), "state", shard.State()).Info("adding fact table shard on placement change")
// when needPeerCopy is true, we also need to purge old data before adding new shard
d.memStore.AddTableShard(table, int(shard.ID()), d.numShardsInCluster, needPeerCopy, needPeerCopy)
}
}
// add/remove dimension tables with the following rules:
// 1. add dimension tables when first shard is assigned to the data node
// 2. remove dimension tables when the last shard is removed from the data node
// 3. copy dimension table data from peer when all assigned new shards are initializing shards
if noExistingShards && !noShardLeft {
// only need to copy data from peer when all new shards are initializing shards
// meaning no available/leaving shards ever owned by this data node
needPeerCopy := initializingShards > 0 && len(incoming) == initializingShards
for _, table := range dimensionTables {
d.logger.With("table", table, "shard", 0).Info("adding dimension table shard on placement change")
// only copy data from peer for dimension table
// when from zero shards to all initialing shards
// when needPeerCopy is true, we also need to purge old data before adding new shard
d.memStore.AddTableShard(table, 0, 1, needPeerCopy, needPeerCopy)
}
}
if !noExistingShards && noShardLeft {
for _, table := range dimensionTables {
d.logger.With("table", table, "shard", 0).Info("removing dimension table shard on placement change")
d.memStore.RemoveTableShard(table, 0)
if err := d.metaStore.DeleteTableShard(table, 0); err != nil {
d.logger.With("table", table, "shard", 0).Error("failed to remove table shard metadata")
}
if err := d.diskStore.DeleteTableShard(table, 0); err != nil {
d.logger.With("table", table, "shard", 0).Error("failed to remove table shard data")
}
}
}
d.shardSet = shardSet
go func() {
if err := d.bootstrapManager.Bootstrap(); err != nil {
d.logger.With("error", err.Error()).Error("error while bootstrapping")
}
}()
}
// GetOwnedShards returns all shard ids the datanode owns
func (d *dataNode) GetOwnedShards() []int {
d.RLock()
defer d.RUnlock()
shardIDs := d.shardSet.AllIDs()
ids := make([]int, len(shardIDs))
for i, shardID := range shardIDs {
ids[i] = int(shardID)
}
return ids
}
func newDatanodeMetrics(scope tally.Scope) datanodeMetrics {
return datanodeMetrics{
restartTimer: scope.Timer("restart"),
}
}
func (d *dataNode) newHandlers() datanodeHandlers {
healthCheckHandler := api.NewHealthCheckHandler()
return datanodeHandlers{
schemaHandler: api.NewSchemaHandler(d.metaStore, d.memStore),
enumHandler: api.NewEnumHandler(d.memStore, d.metaStore),
queryHandler: api.NewQueryHandler(d.memStore, d, d.opts.ServerConfig().Query, d.opts.ServerConfig().HTTP.MaxQueryConnections),
dataHandler: api.NewDataHandler(d.memStore, d.opts.ServerConfig().HTTP.MaxIngestionConnections),
nodeModuleHandler: http.StripPrefix("/node_modules/", http.FileServer(http.Dir("./api/ui/node_modules/"))),
debugStaticHandler: http.StripPrefix("/static/", utils.NoCache(http.FileServer(http.Dir("./api/ui/debug/")))),
swaggerHandler: http.StripPrefix("/swagger/", http.FileServer(http.Dir("./api/ui/swagger/"))),
healthCheckHandler: healthCheckHandler,
debugHandler: api.NewDebugHandler(d.opts.ServerConfig().Cluster.Namespace, d.memStore, d.metaStore, d.handlers.queryHandler, healthCheckHandler, d, d.enumReader),
}
}
// mixed handler for both grpc and traditional http
func mixedHandler(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if grpcServer != nil && r.ProtoMajor == 2 && strings.Contains(r.Header.Get(utils.HTTPContentTypeHeaderKey), utils.HTTPContentTypeApplicationGRPC) {
grpcServer.ServeHTTP(w, r)
} else {
httpHandler.ServeHTTP(w, r)
}
})
}
func (d *dataNode) startBootstrapRetryWatch() {
for {
select {
case <-d.handlers.debugHandler.GetBootstrapRetryChan():
go func() {
err := d.bootstrapManager.Bootstrap()
if err != nil {
d.opts.InstrumentOptions().Logger().With("error", err.Error()).Error("error while retry bootstrapping")
}
}()
}
}
}