plugin/setup.go (107 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // This file shows how to load plugins through a yaml config file. package plugin import ( "errors" "fmt" "time" yaml "gopkg.in/yaml.v3" ) var ( // SetupTimeout is the timeout for initialization of each plugin. // Modify it if some plugins' initialization does take a long time. SetupTimeout = 10 * time.Second // MaxPluginSize is the max number of plugins. MaxPluginSize = 1000 ) // Config is the configuration of all plugins. plugin type => { plugin name => plugin config } type Config map[string]map[string]yaml.Node // Setup loads plugins by configuration. func (c Config) Setup() error { // load plugins one by one through the config file and put them into an ordered plugin queue. plugins, status, err := c.loadPlugins() if err != nil { return err } // remove and setup plugins one by one from the front of the ordered plugin queue. if _, err := c.setupPlugins(plugins, status); err != nil { return err } return nil } func (c Config) loadPlugins() (chan pluginInfo, map[string]bool, error) { var ( plugins = make(chan pluginInfo, MaxPluginSize) // use channel as plugin queue // plugins' status. plugin key => {true: init done, false: init not done}. status = make(map[string]bool) ) for typ, factories := range c { for name, cfg := range factories { factory := Get(typ, name) if factory == nil { return nil, nil, fmt.Errorf("plugin %s:%s no registered or imported, do not configure", typ, name) } p := pluginInfo{ factory: factory, typ: typ, name: name, cfg: cfg, } select { case plugins <- p: default: return nil, nil, fmt.Errorf("plugin number exceed max limit:%d", len(plugins)) } status[p.key()] = false } } return plugins, status, nil } func (c Config) setupPlugins(plugins chan pluginInfo, status map[string]bool) ([]pluginInfo, error) { var ( result []pluginInfo num = len(plugins) ) for num > 0 { for i := 0; i < num; i++ { p := <-plugins if err := p.setup(); err != nil { return nil, err } status[p.key()] = true result = append(result, p) } if len(plugins) == num { // none plugin is setup, circular dependency exists. return nil, fmt.Errorf("cycle depends, not plugin is setup") } num = len(plugins) // continue to process plugins that were moved to tail of the channel. } return result, nil } // ------------------------------------------------------------------------------------- // // pluginInfo is the information of a plugin. type pluginInfo struct { factory Plugin typ string name string cfg yaml.Node } // setup initializes a single plugin. func (p *pluginInfo) setup() error { var ( ch = make(chan struct{}) err error ) go func() { err = p.factory.Setup(p.name, &YamlNodeDecoder{Node: &p.cfg}) close(ch) }() select { case <-ch: case <-time.After(SetupTimeout): return fmt.Errorf("setup plugin %s timeout", p.key()) } if err != nil { return fmt.Errorf("setup plugin %s error: %v", p.key(), err) } return nil } // YamlNodeDecoder is a decoder for a yaml.Node of the yaml config file. type YamlNodeDecoder struct { Node *yaml.Node } // Decode decodes a yaml.Node of the yaml config file. func (d *YamlNodeDecoder) Decode(cfg interface{}) error { if d.Node == nil { return errors.New("yaml node empty") } return d.Node.Decode(cfg) } // key returns the unique index of plugin in the format of 'type-name'. func (p *pluginInfo) key() string { return p.typ + "-" + p.name }