agent/inventory/uploader/uploader.go (196 lines of code) (raw):

package uploader import ( "crypto/md5" "encoding/base64" "encoding/json" "fmt" "github.com/aliyun/aliyun_assist_client/agent/log" "reflect" "strings" "github.com/aliyun/aliyun_assist_client/agent/inventory/model" "github.com/aliyun/aliyun_assist_client/agent/util/jsonutil" ) const ( // Name represents name of this component that uploads data to OOS Name = "InventoryUploader" ) type T interface { SendDataToOOS(items []*model.InventoryItem) (err error) ConvertToOOSInventoryItems(items []model.InventoryItem) (optimizedInventoryItems, nonOptimizedInventoryItems []*model.InventoryItem, err error) GetDirtyOOSInventoryItems(items []model.InventoryItem) (dirtyInventoryItems []*model.InventoryItem, err error) } // InventoryUploader implements functionality to upload data to OOS Inventory. type InventoryUploader struct { ooscaller OOSCaller optimizer Optimizer //helps inventory plugin to optimize PutInventory calls } // NewInventoryUploader creates a new InventoryUploader (which sends data to OOS Inventory) func NewInventoryUploader(instanceID string) (*InventoryUploader, error) { var uploader = InventoryUploader{} var err error if uploader.optimizer, err = NewOptimizerImpl(); err != nil { log.GetLogger().Errorf("Unable to load optimizer for inventory uploader because - %v", err.Error()) return &uploader, err } if uploader.ooscaller, err = NewOOSCallerImpl(); err != nil { log.GetLogger().Errorf("Unable to load ooscaller for inventory uploader because - %v", err.Error()) return &uploader, err } return &uploader, nil } // SendDataToOOS uploads given inventory items to OOS func (u *InventoryUploader) SendDataToOOS(instanceID string, items []*model.InventoryItem) (err error) { log.GetLogger().Debugf("Uploading following inventory data to OOS - %s %v", instanceID, items) log.GetLogger().Infof("Number of Inventory Items: %v", len(items)) //setting up input for PutInventory API call params := &model.PutInventoryInput{ InstanceId: &instanceID, Items: items, } err = u.ooscaller.PutInventory(params) if err != nil { log.GetLogger().Errorf("the following error occured while calling PutInventory API: %v", err) } else { log.GetLogger().Debug("PutInventory was called successfully") u.updateContentHash(items) } return } func (u *InventoryUploader) updateContentHash(items []*model.InventoryItem) { log.GetLogger().Debugf("Updating cache") for _, item := range items { if err := u.optimizer.UpdateContentHash(*item.TypeName, *item.ContentHash); err != nil { err = fmt.Errorf("failed to update content hash cache because of - %v", err.Error()) log.GetLogger().Error(err.Error()) } } } // ConvertToOOSInventoryItems converts given array of inventory.Item into an array of *model.InventoryItem. It returns 2 such arrays - one is optimized array // which contains only contentHash for those inventory types where the dataset hasn't changed from previous collection. The other array is non-optimized array // which contains both contentHash & content. This is done to avoid iterating over the inventory data twice. It throws error when it encounters error during // conversion process. func (u *InventoryUploader) ConvertToOOSInventoryItems(items []model.Item) (optimizedInventoryItems, nonOptimizedInventoryItems []*model.InventoryItem, err error) { //NOTE: There can be multiple inventory type data. //Each inventory type data => 1 inventory Item. Each inventory type, can contain multiple items log.GetLogger().Debugf("Transforming collected inventory data to expected format") //iterating over multiple inventory data types. for _, item := range items { var data string var optimizedItem, nonOptimizedItem *model.InventoryItem newHash := "" oldHash := "" itemName := item.Name //we should only calculate checksum using content & not include capture time - because that field will always change causing //the checksum to change again & again even if content remains same. if item.Content == nil || reflect.ValueOf(item.Content).IsNil() { data = "[]" } else { if data, err = jsonutil.Marshal(item.Content); err != nil { return } } if data[len(data)-1] == '\n' { data = data[0 : len(data)-1] } newHash = calculateCheckSum([]byte(data)) log.GetLogger().Debugf("Item being converted - %v with data - %v with checksum - %v", itemName, string(data), newHash) //construct non-optimized inventory item if nonOptimizedItem, err = convertToOOSInventoryItem(item); err != nil { err = fmt.Errorf("formatting inventory data of %v failed due to %v", itemName, err.Error()) return } //add contentHash too nonOptimizedItem.ContentHash = &newHash log.GetLogger().Debugf("NonOptimized item - %+v", nonOptimizedItem) nonOptimizedInventoryItems = append(nonOptimizedInventoryItems, nonOptimizedItem) //populate optimized item - if content hash matches with earlier collected data. oldHash = u.optimizer.GetContentHash(itemName) log.GetLogger().Debugf("old hash - %v, new hash - %v for the inventory type - %v", oldHash, newHash, itemName) if newHash == oldHash { log.GetLogger().Debugf("Inventory data for %v is same as before - we can just send content hash", itemName) //set the inventory item accordingly optimizedItem = &model.InventoryItem{ CaptureTime: &item.CaptureTime, TypeName: &itemName, SchemaVersion: &item.SchemaVersion, ContentHash: &oldHash, } log.GetLogger().Debugf("Optimized item - %v", optimizedItem) optimizedInventoryItems = append(optimizedInventoryItems, optimizedItem) } else { log.GetLogger().Debugf("New inventory data for %v has been detected - can't optimize here", itemName) log.GetLogger().Debugf("Adding item - %v to the optimizedItems (since its new data)", nonOptimizedItem) optimizedInventoryItems = append(optimizedInventoryItems, nonOptimizedItem) } } return } // GetDirtyOOSInventoryItems get the inventory item data for items that have changes since last successful report to OOS. func (u InventoryUploader) GetDirtyOOSInventoryItems(items []model.Item) (dirtyInventoryItems []*model.InventoryItem, err error) { //NOTE: There can be multiple inventory type data. //Each inventory type data => 1 inventory Item. Each inventory type, can contain multiple items //iterating over multiple inventory data types. for _, item := range items { var data string var rawItem *model.InventoryItem newHash := "" oldHash := "" itemName := item.Name //we should only calculate checksum using content & not include capture time - because that field will always change causing //the checksum to change again & again even if content remains same. if item.Content == nil || reflect.ValueOf(item.Content).IsNil() { data = "[]" } else { if data, err = jsonutil.Marshal(item.Content); err != nil { return } } if data[len(data)-1] == '\n' { data = data[0 : len(data)-1] } newHash = calculateCheckSum([]byte(data)) log.GetLogger().Debugf("Item being converted - %v with data - %v with checksum - %v", itemName, string(data), newHash) //construct non-optimized inventory item if rawItem, err = convertToOOSInventoryItem(item); err != nil { err = fmt.Errorf("Formatting inventory data of %v failed due to %v, rawItem : %#v", itemName, err.Error(), rawItem) return } //add contentHash too rawItem.ContentHash = &newHash //populate optimized item - if content hash matches with earlier collected data. oldHash = u.optimizer.GetContentHash(itemName) log.GetLogger().Debugf("Get Dirty inventory items, old hash - %v, new hash - %v for the inventory type - %v", oldHash, newHash, itemName) if strings.Compare(newHash, oldHash) != 0 { log.GetLogger().Debugf("Dirty inventory type found. Change has been detected for inventory type: %v", itemName) dirtyInventoryItems = append(dirtyInventoryItems, rawItem) } else { log.GetLogger().Debugf("Content hash is the same with the old for %v", itemName) } } return } // convertToOOSInventoryItem converts given InventoryItem to []map[string]*string func convertToOOSInventoryItem(item model.Item) (inventoryItem *model.InventoryItem, err error) { var a []interface{} var c map[string]*string var content = []map[string]*string{} var dataB []byte dataType := reflect.ValueOf(item.Content) switch dataType.Kind() { case reflect.Struct: //this should be converted to map[string]*string c = convertToMap(item.Content) content = append(content, c) case reflect.Array, reflect.Slice: //this should be converted to []map[string]*string dataB, _ = json.Marshal(item.Content) json.Unmarshal(dataB, &a) // If a is empty array, then content has to be empty array // instead of nil, as InventoryItem.Content has // to be empty array [] after serializing to Json, // based on the contract with OOS:PutInventory API. for _, v := range a { // convert each item to map[string]*string c = convertToMap(v) content = append(content, c) } default: //NOTE: collected inventory data is expected to be either a struct or an array err = fmt.Errorf("Unsupported data format - %v.", dataType.Kind()) return } inventoryItem = &model.InventoryItem{ CaptureTime: &item.CaptureTime, TypeName: &item.Name, SchemaVersion: &item.SchemaVersion, Content: content, } return inventoryItem, nil } // ConvertToMap converts given object to map[string]*string func convertToMap(input interface{}) (res map[string]*string) { var m map[string]interface{} b, _ := json.Marshal(input) json.Unmarshal(b, &m) res = make(map[string]*string) for k, v := range m { asString := toString(v) res[k] = &asString } return res } // toString converts given input to string func toString(v interface{}) string { if v, isString := v.(string); isString { return v } b, _ := json.Marshal(v) return string(b) } func calculateCheckSum(data []byte) (checkSum string) { sum := md5.Sum(data) checkSum = base64.StdEncoding.EncodeToString(sum[:]) return }