banyand/metadata/embeddedetcd/server.go (141 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package embeddedetcd implements an embedded etcd server.
package embeddedetcd
import (
"io"
"net/url"
"os"
"path/filepath"
"time"
"github.com/rs/zerolog"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
// Server is the interface of etcd server.
type Server interface {
io.Closer
ReadyNotify() <-chan struct{}
StopNotify() <-chan struct{}
StoppingNotify() <-chan struct{}
}
type server struct {
server *embed.Etcd
}
// Option is the option for etcd server.
type Option func(*config)
// RootDir sets the root directory of Registry.
func RootDir(rootDir string) Option {
return func(config *config) {
config.rootDir = rootDir
}
}
// ConfigureListener sets peer urls of listeners.
func ConfigureListener(lcs, lps []string) Option {
return func(config *config) {
config.listenerClientURLs = lcs
config.listenerPeerURLs = lps
}
}
// AutoCompactionMode sets the auto compaction mode.
func AutoCompactionMode(mode string) Option {
return func(config *config) {
config.autoCompactionMode = mode
}
}
// AutoCompactionRetention sets the auto compaction retention.
func AutoCompactionRetention(retention string) Option {
return func(config *config) {
config.autoCompactionRetention = retention
}
}
// QuotaBackendBytes sets the quota for backend storage.
func QuotaBackendBytes(quota int64) Option {
return func(config *config) {
config.quotaBackendBytes = quota
}
}
type config struct {
// rootDir is the root directory for etcd storage
rootDir string
// autoCompactionMode is the auto compaction mode
autoCompactionMode string
// autoCompactionRetention is the auto compaction retention
autoCompactionRetention string
// listenerClientURLs is the listener for client
listenerClientURLs []string
// listenerPeerURLs is the listener for peer
listenerPeerURLs []string
// quotaBackendBytes is the quota for backend storage
quotaBackendBytes int64
}
func (e *server) ReadyNotify() <-chan struct{} {
return e.server.Server.ReadyNotify()
}
func (e *server) StopNotify() <-chan struct{} {
return e.server.Server.StopNotify()
}
func (e *server) StoppingNotify() <-chan struct{} {
return e.server.Server.StoppingNotify()
}
func (e *server) Close() error {
e.server.Close()
<-e.server.Server.StopNotify()
return nil
}
// NewServer returns a new etcd server.
func NewServer(options ...Option) (Server, error) {
conf := &config{
rootDir: os.TempDir(),
listenerClientURLs: []string{embed.DefaultListenClientURLs},
listenerPeerURLs: []string{embed.DefaultListenPeerURLs},
autoCompactionMode: "periodic",
autoCompactionRetention: "1h",
quotaBackendBytes: 2 * 1024 * 1024 * 1024,
}
for _, opt := range options {
opt(conf)
}
zapCfg := logger.GetLogger("etcd-server").DefaultLevel(zerolog.ErrorLevel).ToZapConfig()
var l *zap.Logger
var err error
if l, err = zapCfg.Build(); err != nil {
return nil, err
}
embedConfig, err := newEmbedEtcdConfig(conf, l)
if err != nil {
return nil, err
}
e, err := embed.StartEtcd(embedConfig)
if err != nil {
return nil, err
}
if e != nil {
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
}
reg := &server{
server: e,
}
return reg, nil
}
func newEmbedEtcdConfig(config *config, logger *zap.Logger) (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)
cfg.Dir = filepath.Join(config.rootDir, "metadata")
observability.UpdatePath(cfg.Dir)
parseURLs := func(urls []string) ([]url.URL, error) {
uu := make([]url.URL, 0, len(urls))
for _, u := range urls {
cURL, err := url.Parse(u)
if err != nil {
return nil, err
}
uu = append(uu, *cURL)
}
return uu, nil
}
cURLs, err := parseURLs(config.listenerClientURLs)
if err != nil {
return nil, err
}
pURLs, err := parseURLs(config.listenerPeerURLs)
if err != nil {
return nil, err
}
cfg.Name = "meta"
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
cfg.AutoCompactionMode = config.autoCompactionMode
cfg.AutoCompactionRetention = config.autoCompactionRetention
cfg.QuotaBackendBytes = config.quotaBackendBytes
cfg.BackendBatchInterval = 500 * time.Millisecond
cfg.BackendBatchLimit = 10000
cfg.MaxRequestBytes = 10 * 1024 * 1024
return cfg, nil
}