agent/pluginmanager/installeds.go (262 lines of code) (raw):
package pluginmanager
import (
"encoding/binary"
"encoding/json"
"errors"
"io"
"os"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/common/fuzzyjson"
)
const (
_PluginListBucketName = "pluginList"
)
var (
errEmptyInstalledPluginsJsonFile = errors.New("empty content in existed installed_plugins JSON database file")
)
// _InstalledPluginsJson 和installed_plugins文件内容一致,用于解析json
type _InstalledPluginsJson struct {
PluginList []PluginInfo `json:"pluginList"`
}
type InstalledPlugins struct {
boltdb *bolt.DB
}
func LoadInstalledPlugins() (*InstalledPlugins, error) {
return loadInstalledPlugins(false)
}
func LoadPreInstalledPlugins() (*InstalledPlugins, error) {
return loadInstalledPlugins(true)
}
// TODO-FIXME: Should LoadInstalledPlugins has a timeout limit? Now it simply
// waits indefinitely.
func loadInstalledPlugins(isPreInstalled bool) (*InstalledPlugins, error) {
var boltPath string
var err error
if isPreInstalled {
boltPath, err = getPreInstalledPluginsBoltPath()
if err != nil {
return nil, err
}
} else {
boltPath, err = getInstalledPluginsBoltPath()
if err != nil {
return nil, err
}
}
// 1. Just use the new BoltDB file if existed, but NEVER AUTO-CREATE IT if
// not existed
boltdb, err := bolt.Open(boltPath, os.FileMode(0o0640), &bolt.Options{
OpenFile: func(name string, flag int, perm os.FileMode) (*os.File, error) {
return os.OpenFile(name, flag & ^os.O_CREATE, perm)
},
})
if err == nil {
return &InstalledPlugins{
boltdb: boltdb,
}, nil
}
if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
// 2. Just use the new BoltDB file when old JSON file does not exist
jsonPath, err := getInstalledPluginsJSONPath()
if err != nil {
return nil, err
}
jsonFile, err := os.OpenFile(jsonPath, os.O_RDONLY, 0)
if err != nil {
// Resolved: Simply use BoltDB database file on error encountered when
// opening installed_plugins JSON database file. Just log the error and
// let it go then.
log.GetLogger().WithError(err).Warningln("Failed to open obsolete installed_plugins JSON database file")
return _loadInstalledPluginsBolt(boltPath)
}
// 3. Now migrate data from old existed JSON file to the new BoltDB file,
// which MUST be created EXCLUSIVELY BY CURRENT PROCESS
installedPluginInfoBytes, err := _loadEachInstalledPluginJsonAsBytes(jsonFile)
if err != nil {
// Resolved: Simply use BoltDB database file on error encountered when
// reading and parsing installed_plugins JSON database file. Empty file
// is also unacceptable.
log.GetLogger().WithError(err).Warningln("Failed to read obsolete installed_plugins JSON database file")
return _loadInstalledPluginsBolt(boltPath)
}
boltdb, err = bolt.Open(boltPath, os.FileMode(0o0640), &bolt.Options{
OpenFile: func(name string, flag int, perm os.FileMode) (*os.File, error) {
return os.OpenFile(name, flag|os.O_EXCL, perm)
},
})
if err != nil {
if errors.Is(err, os.ErrExist) {
// !!!RACE CONDITION HERE!!!
// Existed installed_plugins.db BoltDB file means another process
// (e.g., agent or yet another acs-plugin-manager) is doing or has
// done the migration. Simply fallback to general opening procedure.
return _loadInstalledPluginsBolt(boltPath)
} else {
return nil, err
}
}
if err := _insertInstalledPluginInfoBytes(boltdb, installedPluginInfoBytes); err != nil {
_ = boltdb.Close()
return nil, err
}
return &InstalledPlugins{
boltdb: boltdb,
}, nil
}
func _loadInstalledPluginsBolt(boltPath string) (*InstalledPlugins, error) {
boltdb, err := bolt.Open(boltPath, os.FileMode(0o0640), nil)
if err != nil {
return nil, err
}
return &InstalledPlugins{
boltdb: boltdb,
}, nil
}
func _loadEachInstalledPluginJsonAsBytes(jsonFile *os.File) ([][]byte, error) {
body, err := io.ReadAll(jsonFile)
if err != nil {
return nil, err
}
content := string(body)
if len(content) == 0 {
return nil, errEmptyInstalledPluginsJsonFile
}
installedPluginsJson := _InstalledPluginsJson{}
if err := fuzzyjson.Unmarshal(content, &installedPluginsJson); err != nil {
log.GetLogger().WithFields(logrus.Fields{
"content": content,
}).WithError(err).Errorln("Failed to unmarshal JSON database file of install plugins")
return nil, err
}
var payloads [][]byte
for _, plugin := range installedPluginsJson.PluginList {
payload, err := json.Marshal(plugin)
if err != nil {
return nil, err
}
payloads = append(payloads, payload)
}
return payloads, nil
}
func _insertInstalledPluginInfoBytes(boltdb *bolt.DB, payloads [][]byte) error {
return boltdb.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(_PluginListBucketName))
if err != nil {
return err
}
for _, payload := range payloads {
// Generate auto-incremental ID for the plugin information.
// According to the documentation of https://github.com/etcd-io/bbolt,
// the NextSequence() method returns an error only if the Tx is closed
// or not writeable. That can't happen in an Update() call, so the error
// check can be safely ignored.
id, _ := bucket.NextSequence()
insertedKey := int(id)
if err := bucket.Put(_itob(insertedKey), payload); err != nil {
return err
}
}
return nil
})
}
// Close can be called multiple times and internal implementation in bbolt would
// keep it safe. Thus close the database as soon as possible please.
func (ip *InstalledPlugins) Close() error {
return ip.boltdb.Close()
}
func (ip *InstalledPlugins) FindAll() ([]int, []PluginInfo, error) {
var keys []int
var values []PluginInfo
err := ip.boltdb.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_PluginListBucketName))
if bucket == nil {
return nil
}
return bucket.ForEach(func(k, v []byte) error {
var plugin PluginInfo
if err := json.Unmarshal(v, &plugin); err != nil {
// Raising errors to caller for more flexible handling
return err
}
keys = append(keys, _b8toi(k))
values = append(values, plugin)
return nil
})
})
if err != nil {
return nil, nil, err
}
return keys, values, nil
}
func (ip *InstalledPlugins) FindManyByName(name string) ([]int, []PluginInfo, error) {
keys, values, err := ip.FindAll()
if err != nil {
return nil, nil, err
}
foundKeys := []int{}
foundValues := []PluginInfo{}
for i := 0; i < len(values); i++ {
if values[i].Name != name {
continue
}
foundKeys = append(foundKeys, keys[i])
foundValues = append(foundValues, values[i])
}
return foundKeys, foundValues, nil
}
func (ip *InstalledPlugins) FindOneWithPredicate(predicate func(plugin *PluginInfo) bool) (int, *PluginInfo, error) {
var foundKey int = -1
var foundValue *PluginInfo
err := ip.boltdb.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_PluginListBucketName))
if bucket == nil {
return nil
}
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var plugin PluginInfo
if err := json.Unmarshal(v, &plugin); err != nil {
// Raising errors to caller for more flexible handling
return err
}
if !predicate(&plugin) {
continue
}
foundKey = _b8toi(k)
foundValue = &plugin
return nil
}
return nil
})
if err != nil {
return -1, nil, err
}
return foundKey, foundValue, nil
}
func (ip *InstalledPlugins) Insert(value *PluginInfo) (int, error) {
content, err := json.Marshal(value)
if err != nil {
return -1, err
}
var insertedKey int = -1
err = ip.boltdb.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(_PluginListBucketName))
if err != nil {
return err
}
// Generate auto-incremental ID for the plugin information.
// According to the documentation of https://github.com/etcd-io/bbolt,
// the NextSequence() method returns an error only if the Tx is closed
// or not writeable. That can't happen in an Update() call, so the error
// check can be safely ignored.
id, _ := bucket.NextSequence()
insertedKey = int(id)
return bucket.Put(_itob(insertedKey), []byte(content))
})
if err != nil {
return -1, err
}
return insertedKey, nil
}
// Update method simply stores new value to specified position in JSON array.
// Would PANIC if key is out of range.
func (ip *InstalledPlugins) Update(key int, value *PluginInfo) error {
content, err := json.Marshal(value)
if err != nil {
return err
}
return ip.boltdb.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(_PluginListBucketName))
if err != nil {
return err
}
return bucket.Put(_itob(key), []byte(content))
})
}
func (ip *InstalledPlugins) DeleteByKey(key int) error {
return ip.boltdb.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(_PluginListBucketName))
if bucket == nil {
return nil
}
return bucket.Delete(_itob(key))
})
}
// _itob returns an 8-byte little endian representation of v.
func _itob(v int) []byte {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(v))
return b
}
func _b8toi(b []byte) int {
return int(binary.LittleEndian.Uint64(b))
}