pulsar/internal/rpc_client.go (208 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar/backoff"
"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/log"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"google.golang.org/protobuf/proto"
)
var (
// ErrRequestTimeOut happens when request not finished in given requestTimeout.
ErrRequestTimeOut = errors.New("request timed out")
)
type result struct {
*RPCResult
error
}
type RPCResult struct {
Response *pb.BaseCommand
Cnx Connection
}
type RPCClient interface {
// Create a new unique request id
NewRequestID() uint64
NewProducerID() uint64
NewConsumerID() uint64
// Send a request and block until the result is available
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
LookupService(URL string) LookupService
}
type rpcClient struct {
pool ConnectionPool
requestTimeout time.Duration
requestIDGenerator uint64
producerIDGenerator uint64
consumerIDGenerator uint64
log log.Logger
metrics *Metrics
tlsConfig *TLSOptions
listenerName string
authProvider auth.Provider
lookupService LookupService
urlLookupServiceMapLock sync.RWMutex
urlLookupServiceMap map[string]LookupService
lookupProperties []*pb.KeyValue
}
func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider, lookupProperties []*pb.KeyValue) RPCClient {
c := rpcClient{
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
listenerName: listenerName,
tlsConfig: tlsConfig,
authProvider: authProvider,
urlLookupServiceMap: make(map[string]LookupService),
lookupProperties: lookupProperties,
}
lookupService, err := c.NewLookupService(serviceURL)
if err != nil {
panic(err)
}
c.lookupService = lookupService
return &c
}
func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
var err error
var host *url.URL
bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond)
// we can retry these requests because this kind of request is
// not specific to any particular broker
opFn := func() (*RPCResult, error) {
host, err = (*serviceNameResolver).ResolveHost()
if err != nil {
c.log.WithError(err).Errorf("rpc client failed to resolve host")
return nil, err
}
return c.Request(host, host, requestID, cmdType, message)
}
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
defer cancel()
rpcResult, err := Retry(ctx, opFn, func(_ error) time.Duration {
retryTime := bo.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
return retryTime
})
return rpcResult, err
}
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
return c.requestToHost(c.lookupService.ServiceNameResolver(), requestID, cmdType, message)
}
func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
return c.requestToHost(serviceNameResolver, requestID, cmdType, message)
}
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
return c.RequestWithCnxKeySuffix(logicalAddr, physicalAddr, c.pool.GenerateRoundRobinIndex(),
requestID, cmdType, message)
}
func (c *rpcClient) RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, cnxKeySuffix)
if err != nil {
return nil, err
}
return c.RequestOnCnx(cnx, requestID, cmdType, message)
}
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()
ch := make(chan result, 1)
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
})
timeoutCh := time.After(c.requestTimeout)
for {
select {
case res := <-ch:
// Ignoring producer not ready response.
// Continue to wait for the producer to create successfully
if res.error == nil && res.Response != nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
timeoutCh = nil
break
}
}
return res.RPCResult, res.error
case <-timeoutCh:
return nil, ErrRequestTimeOut
}
}
}
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
c.metrics.RPCRequestCount.Inc()
return cnx.SendRequestNoWait(baseCommand(cmdType, message))
}
func (c *rpcClient) NewRequestID() uint64 {
return atomic.AddUint64(&c.requestIDGenerator, 1)
}
func (c *rpcClient) NewProducerID() uint64 {
return atomic.AddUint64(&c.producerIDGenerator, 1)
}
func (c *rpcClient) NewConsumerID() uint64 {
return atomic.AddUint64(&c.consumerIDGenerator, 1)
}
func (c *rpcClient) LookupService(URL string) LookupService {
if URL == "" {
return c.lookupService
}
c.urlLookupServiceMapLock.Lock()
defer c.urlLookupServiceMapLock.Unlock()
lookupService, ok := c.urlLookupServiceMap[URL]
if ok {
return lookupService
}
serviceURL, err := url.Parse(URL)
if err != nil {
panic(err)
}
lookupService, err = c.NewLookupService(serviceURL)
if err != nil {
panic(err)
}
c.urlLookupServiceMap[URL] = lookupService
return lookupService
}
func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
switch url.Scheme {
case "pulsar", "pulsar+ssl":
serviceNameResolver := NewPulsarServiceNameResolver(url)
return NewLookupService(c, url, serviceNameResolver,
c.tlsConfig != nil, c.listenerName, c.lookupProperties, c.log, c.metrics), nil
case "http", "https":
serviceNameResolver := NewPulsarServiceNameResolver(url)
httpClient, err := NewHTTPClient(url, serviceNameResolver, c.tlsConfig,
c.requestTimeout, c.log, c.metrics, c.authProvider)
if err != nil {
return nil, err
}
return NewHTTPLookupService(
httpClient, url, serviceNameResolver, c.tlsConfig != nil, c.log, c.metrics), nil
default:
panic(fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
}
func (c *rpcClient) Close() {
c.lookupService.Close()
c.urlLookupServiceMapLock.Lock()
defer c.urlLookupServiceMapLock.Unlock()
for _, value := range c.urlLookupServiceMap {
value.Close()
}
}