internal/pkg/config/config.go (125 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package config import ( "context" "fmt" "log" "os" "path/filepath" "strconv" "github.com/apache/synapse-go/internal/pkg/core/artifacts" "github.com/apache/synapse-go/internal/pkg/core/utils" "github.com/apache/synapse-go/internal/pkg/loggerfactory" "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/v2" ) // Config implements the common.ConfigProvider interface type Config struct { koanf *koanf.Koanf } func ReadFile(filename string) (*Config, error) { k := koanf.New(".") f := file.Provider(filename) if err := k.Load(f, toml.Parser()); err != nil { return nil, err } cfg := &Config{ koanf: k, } return cfg, nil } func (c *Config) IsSet(key string) bool { return c.koanf.Exists(key) } func (c *Config) Watch(ctx context.Context, filename string) { f := file.Provider(filename) f.Watch(func(event interface{}, err error) { if err != nil { log.Printf("watch error: %v", err) return } // Throw away the old config and load a fresh copy. log.Println("config changed. Reloading ...") new_k := koanf.New(".") if err := new_k.Load(f, toml.Parser()); err != nil { log.Printf("error loading new config: %v", err) return } // Update the config c.koanf = new_k // Update the logger configuration var levelMap map[string]string var slogHandlerConfig loggerfactory.SlogHandlerConfig c.MustUnmarshal("logger.level.packages", &levelMap) c.MustUnmarshal("logger.handler", &slogHandlerConfig) cm := loggerfactory.GetConfigManager() cm.SetLogLevelMap(&levelMap) cm.SetSlogHandlerConfig(slogHandlerConfig) }) } func (c *Config) Unmarshal(key string, out interface{}) error { err := c.koanf.Unmarshal(key, out) if err != nil { return fmt.Errorf("cannot unmarshal config for key %q: %v", key, err) } return nil } func (c *Config) MustUnmarshal(key string, out interface{}) { err := c.Unmarshal(key, out) if err != nil { panic(err) } } func InitializeConfig(ctx context.Context, confFolderPath string) error { files, err := os.ReadDir(confFolderPath) configContext := ctx.Value(utils.ConfigContextKey).(*artifacts.ConfigContext) if err != nil { return err } if len(files) == 0 { return fmt.Errorf("no config files found in %s", confFolderPath) } // {"LoggerConfig", "deployment"} for _, configurationType := range []string{"LoggerConfig", "deployment"} { configFilePath := filepath.Join(confFolderPath, configurationType+".toml") cfg, err := ReadFile(configFilePath) if err != nil { return fmt.Errorf("cannot read config file: %w", err) } switch configurationType { case "LoggerConfig": var levelMap map[string]string var slogHandlerConfig loggerfactory.SlogHandlerConfig if cfg.IsSet("logger") { cfg.MustUnmarshal("logger.handler", &slogHandlerConfig) cfg.MustUnmarshal("logger.level.packages", &levelMap) } cm := loggerfactory.GetConfigManager() cm.SetLogLevelMap(&levelMap) cm.SetSlogHandlerConfig(slogHandlerConfig) // Start watching for config changes cfg.Watch(context.Background(), configFilePath) case "deployment": deploymentConfigMap := make(map[string]interface{}) if cfg.IsSet("server") { var serverConfigMap map[string]string cfg.MustUnmarshal("server", &serverConfigMap) // Validate required hostname key hostname, exists := serverConfigMap["hostname"] if !exists { return fmt.Errorf("missing required server configuration key: hostname") } // Validate hostname value if hostname == "" { return fmt.Errorf("server hostname cannot be empty") } // Validate offset if it exists (optional) if offsetStr, hasOffset := serverConfigMap["offset"]; hasOffset && offsetStr != "" { offset, err := strconv.Atoi(offsetStr) if err != nil { return fmt.Errorf("invalid server offset value: %s, must be an integer", offsetStr) } if offset < 0 { return fmt.Errorf("server offset must be non-negative, got: %d", offset) } } deploymentConfigMap["server"] = serverConfigMap } else { return fmt.Errorf("server configuration section is required in deployment.toml") } configContext.AddDeploymentConfig(deploymentConfigMap) } } return nil }