xds/filesystem-control-panel/server/main.go (129 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"flag"
"io/ioutil"
"log"
"os"
"strconv"
)
import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/v3"
"github.com/fsnotify/fsnotify"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"
)
import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/xds"
pixiupb "github.com/dubbo-go-pixiu/pixiu-api/pkg/xds/model"
)
var (
l Logger
port = uint(18000)
nodeID = "test-id"
version = 100
)
func init() {
l = Logger{}
l.Debug = true
}
func main() {
flag.Parse()
// Create a snaphost
snaphost := cache.NewSnapshotCache(false, cache.IDHash{}, l)
go func() {
// Create the config that we'll serve to Envoy
config := GenerateSnapshotPixiuFromFile()
if err := config.Consistent(); err != nil {
l.Errorf("config inconsistency: %+v\n%+v", config, err)
os.Exit(1)
}
l.Debugf("will serve config %+v", config)
// Add the config to the snaphost
if err := snaphost.SetSnapshot(context.Background(), nodeID, config); err != nil {
l.Errorf("config error %q for %+v", err, config)
os.Exit(1)
}
go func() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op == fsnotify.Write {
log.Println("event:", event)
log.Println("modified file:", event.Name)
conf := GenerateSnapshotPixiuFromFile()
// Add the config to the snaphost
if err := snaphost.SetSnapshot(context.Background(), nodeID, conf); err != nil {
l.Errorf("config error %q for %+v", err, config)
os.Exit(1)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add("../pixiu")
if err != nil {
log.Fatal(err)
}
<-done
}()
}()
// Run the xDS server
ctx := context.Background()
cb := &test.Callbacks{Debug: l.Debug}
srv := server.NewServer(ctx, snaphost, cb)
RunServer(ctx, srv, port)
}
func GenerateSnapshotPixiuFromFile() *cache.Snapshot {
cdsStr, err := ioutil.ReadFile("../pixiu/cds.json")
if err != nil {
l.Errorf("%s", err)
}
cds := &pixiupb.PixiuExtensionClusters{}
err = protojson.Unmarshal(cdsStr, cds)
if err != nil {
l.Errorf("%s", err)
}
ldsStr, err := ioutil.ReadFile("../pixiu/lds.json")
if err != nil {
l.Errorf("%s", err)
}
lds := &pixiupb.PixiuExtensionListeners{}
err = protojson.Unmarshal(ldsStr, lds)
if err != nil {
l.Errorf("%s", err)
}
version++
ldsResource, _ := anypb.New(lds)
cdsResource, _ := anypb.New(cds)
snap, _ := cache.NewSnapshot(strconv.Itoa(version),
map[resource.Type][]types.Resource{
resource.ExtensionConfigType: {
&core.TypedExtensionConfig{
Name: xds.ClusterType,
TypedConfig: cdsResource,
},
&core.TypedExtensionConfig{
Name: xds.ListenerType,
TypedConfig: ldsResource,
},
},
},
)
return snap
}