banyand/queue/pub/client.go (397 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pub import ( "context" "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/apache/skywalking-banyandb/api/common" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" ) const rpcTimeout = 2 * time.Second var ( // Retry policy for health check. initBackoff = time.Second maxBackoff = 20 * time.Second backoffMultiplier = 2.0 serviceName = clusterv1.Service_ServiceDesc.ServiceName // The timeout is set by each RPC. retryPolicy = fmt.Sprintf(`{ "methodConfig": [{ "name": [{"service": "%s"}], "waitForReady": true, "retryPolicy": { "MaxAttempts": 4, "InitialBackoff": ".5s", "MaxBackoff": "10s", "BackoffMultiplier": 1.0, "RetryableStatusCodes": [ "UNAVAILABLE" ] } }]}`, serviceName) ) type client struct { client clusterv1.ServiceClient conn *grpc.ClientConn md schema.Metadata } func (p *pub) OnAddOrUpdate(md schema.Metadata) { if md.Kind != schema.KindNode { return } node, ok := md.Spec.(*databasev1.Node) if !ok { p.log.Warn().Msg("failed to cast node spec") return } var hasDataRole bool for _, r := range node.Roles { if r == databasev1.Role_ROLE_DATA { hasDataRole = true break } } if !hasDataRole { return } address := node.GrpcAddress if address == "" { p.log.Warn().Stringer("node", node).Msg("grpc address is empty") return } name := node.Metadata.GetName() if name == "" { p.log.Warn().Stringer("node", node).Msg("node name is empty") return } p.mu.Lock() defer p.mu.Unlock() p.registerNode(node) if _, ok := p.active[name]; ok { return } if _, ok := p.evictable[name]; ok { return } credOpts, err := p.getClientTransportCredentials() if err != nil { p.log.Error().Err(err).Msg("failed to load client TLS credentials") return } conn, err := grpc.NewClient(address, append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...) if err != nil { p.log.Error().Err(err).Msg("failed to connect to grpc server") return } if !p.checkClientHealthAndReconnect(conn, md) { p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is unhealthy in the register flow, move it to evict queue") return } c := clusterv1.NewServiceClient(conn) p.active[name] = &client{conn: conn, client: c, md: md} p.addClient(md) p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new node is healthy, add it to active queue") } func (p *pub) registerNode(node *databasev1.Node) { name := node.Metadata.GetName() defer func() { p.registered[name] = node }() n, ok := p.registered[name] if !ok { return } if n.GrpcAddress == node.GrpcAddress { return } if en, ok := p.evictable[name]; ok { close(en.c) delete(p.evictable, name) p.log.Info().Str("node", name).Str("status", p.dump()).Msg("node is removed from evict queue by the new gRPC address updated event") } if client, ok := p.active[name]; ok { _ = client.conn.Close() delete(p.active, name) p.deleteClient(client.md) p.log.Info().Str("status", p.dump()).Str("node", name).Msg("node is removed from active queue by the new gRPC address updated event") } } func (p *pub) OnDelete(md schema.Metadata) { if md.Kind != schema.KindNode { return } node, ok := md.Spec.(*databasev1.Node) if !ok { p.log.Warn().Msg("failed to cast node spec") return } name := node.Metadata.GetName() if name == "" { p.log.Warn().Stringer("node", node).Msg("node name is empty") return } p.mu.Lock() defer p.mu.Unlock() delete(p.registered, name) if en, ok := p.evictable[name]; ok { close(en.c) delete(p.evictable, name) p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from evict queue by delete event") return } if client, ok := p.active[name]; ok { if p.removeNodeIfUnhealthy(md, node, client) { p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("remove node from active queue by delete event") return } if !p.closer.AddRunning() { return } go func() { defer p.closer.Done() backoff := initBackoff var elapsed time.Duration for { select { case <-time.After(backoff): if func() bool { elapsed += backoff p.mu.Lock() defer p.mu.Unlock() if _, ok := p.registered[name]; ok { // The client has been added back to registered clients map, just return return true } if p.removeNodeIfUnhealthy(md, node, client) { p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", elapsed).Msg("remove node from active queue by delete event") return true } return false }() { return } case <-p.closer.CloseNotify(): return } if backoff < maxBackoff { backoff *= time.Duration(backoffMultiplier) } else { backoff = maxBackoff } } }() } } func (p *pub) removeNodeIfUnhealthy(md schema.Metadata, node *databasev1.Node, client *client) bool { if p.healthCheck(node.String(), client.conn) { return false } _ = client.conn.Close() name := node.Metadata.GetName() delete(p.active, name) p.deleteClient(md) return true } func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md schema.Metadata) bool { node, ok := md.Spec.(*databasev1.Node) if !ok { logger.Panicf("failed to cast node spec") return false } if p.healthCheck(node.String(), conn) { return true } _ = conn.Close() if !p.closer.AddRunning() { return false } name := node.Metadata.Name p.evictable[name] = evictNode{n: node, c: make(chan struct{})} p.deleteClient(md) go func(p *pub, name string, en evictNode, md schema.Metadata) { defer p.closer.Done() backoff := initBackoff for { select { case <-time.After(backoff): credOpts, errEvict := p.getClientTransportCredentials() if errEvict != nil { p.log.Error().Err(errEvict).Msg("failed to load client TLS credentials (evict)") return } connEvict, errEvict := grpc.NewClient(node.GrpcAddress, append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...) if errEvict == nil && p.healthCheck(en.n.String(), connEvict) { func() { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.evictable[name]; !ok { // The client has been removed from evict clients map, just return return } c := clusterv1.NewServiceClient(connEvict) p.active[name] = &client{conn: connEvict, client: c, md: md} p.addClient(md) delete(p.evictable, name) p.log.Info().Str("status", p.dump()).Stringer("node", en.n).Msg("node is healthy, move it back to active queue") }() return } if errEvict != nil { _ = connEvict.Close() } if _, ok := p.registered[name]; !ok { return } p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff) case <-en.c: return case <-p.closer.CloseNotify(): return } if backoff < maxBackoff { backoff *= time.Duration(backoffMultiplier) } else { backoff = maxBackoff } } }(p, name, p.evictable[name], md) return false } func (p *pub) healthCheck(node string, conn *grpc.ClientConn) bool { var resp *grpc_health_v1.HealthCheckResponse if err := grpchelper.Request(context.Background(), rpcTimeout, func(rpcCtx context.Context) (err error) { resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx, &grpc_health_v1.HealthCheckRequest{ Service: "", }) return err }); err != nil { if e := p.log.Debug(); e.Enabled() { e.Err(err).Str("node", node).Msg("service unhealthy") } return false } if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING { return true } return false } func (p *pub) checkServiceHealth(svc string, conn *grpc.ClientConn) *common.Error { client := clusterv1.NewServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) defer cancel() resp, err := client.HealthCheck(ctx, &clusterv1.HealthCheckRequest{ ServiceName: svc, }) if err != nil { return common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, err.Error()) } if resp.Status == modelv1.Status_STATUS_SUCCEED { return nil } return common.NewErrorWithStatus(resp.Status, resp.Error) } func (p *pub) failover(node string, ce *common.Error, topic bus.Topic) { p.mu.Lock() defer p.mu.Unlock() if ce.Status() != modelv1.Status_STATUS_INTERNAL_ERROR { _, _ = p.checkWritable(node, topic) return } if en, evictable := p.evictable[node]; evictable { if _, registered := p.registered[node]; !registered { close(en.c) delete(p.evictable, node) p.log.Info().Str("node", node).Str("status", p.dump()).Msg("node is removed from evict queue by wire event") } return } if client, ok := p.active[node]; ok && !p.checkClientHealthAndReconnect(client.conn, client.md) { _ = client.conn.Close() delete(p.active, node) p.deleteClient(client.md) p.log.Info().Str("status", p.dump()).Str("node", node).Msg("node is unhealthy in the failover flow, move it to evict queue") } } func (p *pub) checkWritable(n string, topic bus.Topic) (bool, *common.Error) { h, ok := p.handlers[topic] if !ok { return false, nil } node, ok := p.active[n] if !ok { return false, nil } err := p.checkServiceHealth(topic.String(), node.conn) if err == nil { return true, nil } h.OnDelete(node.md) if !p.closer.AddRunning() { return false, err } go func() { defer p.closer.Done() backoff := initBackoff for { select { case <-time.After(backoff): if errInternal := p.checkServiceHealth(topic.String(), node.conn); errInternal != nil { func() { p.mu.Lock() defer p.mu.Unlock() node, ok := p.active[n] if !ok { return } h.OnAddOrUpdate(node.md) }() return } p.log.Warn().Str("topic", topic.String()).Err(err).Str("node", n).Dur("backoff", backoff).Msg("data node can not ingest data") case <-p.closer.CloseNotify(): return } if backoff < maxBackoff { backoff *= time.Duration(backoffMultiplier) } else { backoff = maxBackoff } } }() return false, err } func (p *pub) deleteClient(md schema.Metadata) { if len(p.handlers) > 0 { for _, h := range p.handlers { h.OnDelete(md) } } } func (p *pub) addClient(md schema.Metadata) { if len(p.handlers) > 0 { for _, h := range p.handlers { h.OnAddOrUpdate(md) } } } func (p *pub) dump() string { keysRegistered := make([]string, 0, len(p.registered)) for k := range p.registered { keysRegistered = append(keysRegistered, k) } keysActive := make([]string, 0, len(p.active)) for k := range p.active { keysActive = append(keysActive, k) } keysEvictable := make([]string, 0, len(p.evictable)) for k := range p.evictable { keysEvictable = append(keysEvictable, k) } return fmt.Sprintf("registered: %v, active :%v, evictable :%v", keysRegistered, keysActive, keysEvictable) } type evictNode struct { n *databasev1.Node c chan struct{} }