in agent/association/processor/processor.go [151:261]
func (p *Processor) ProcessAssociation() {
log := p.context.Log()
associations := []*model.InstanceAssociation{}
log.Debug("running ProcessAssociation")
instanceID, err := p.context.Identity().InstanceID()
if err != nil {
log.Error("Unable to retrieve instance id", err)
return
}
p.assocSvc.CreateNewServiceIfUnHealthy(p.context)
p.complianceUploader.CreateNewServiceIfUnHealthy(log)
if associations, err = p.assocSvc.ListInstanceAssociations(log, instanceID); err != nil {
log.Errorf("Unable to load instance associations, %v", err)
return
}
// to account for any tag expansion delays on boot, call list associations again
if p.onBoot {
p.onBoot = false
if len(associations) < 1 {
log.Info("No associations on boot. Requerying for associations after 30 seconds.")
time.Sleep(defaultRetryWaitOnBootInSeconds * time.Second)
if associations, err = p.assocSvc.ListInstanceAssociations(log, instanceID); err != nil {
log.Errorf("Unable to load instance associations, %v", err)
return
}
}
}
// evict the invalid cache first
for _, assoc := range associations {
cache.ValidateCache(assoc)
}
// read from cache or load association details from service
for _, assoc := range associations {
var assocContent string
if assocContent, err = jsonutil.Marshal(assoc); err != nil {
return
}
log.Debug("Association content is \n", jsonutil.Indent(assocContent))
//TODO: add retry for load association detail
if err = p.assocSvc.LoadAssociationDetail(log, assoc); err != nil {
err = fmt.Errorf("Encountered error while loading association %v contents, %v",
*assoc.Association.AssociationId,
err)
log.Error(err)
assoc.Errors = append(assoc.Errors, err)
p.assocSvc.UpdateInstanceAssociationStatus(
log,
*assoc.Association.AssociationId,
*assoc.Association.Name,
*assoc.Association.InstanceId,
contracts.AssociationStatusFailed,
contracts.AssociationErrorCodeListAssociationError,
times.ToIso8601UTC(time.Now()),
err.Error(),
service.NoOutputUrl)
p.complianceUploader.UpdateAssociationCompliance(
*assoc.Association.AssociationId,
*assoc.Association.InstanceId,
*assoc.Association.Name,
*assoc.Association.DocumentVersion,
contracts.AssociationStatusFailed,
time.Now().UTC())
continue
}
if !assoc.IsRunOnceAssociation() {
if err = assoc.ParseExpression(log); err != nil {
message := fmt.Sprintf("Encountered error while parsing expression for association %v", *assoc.Association.AssociationId)
log.Errorf("%v, %v", message, err)
assoc.Errors = append(assoc.Errors, err)
p.assocSvc.UpdateInstanceAssociationStatus(
log,
*assoc.Association.AssociationId,
*assoc.Association.Name,
*assoc.Association.InstanceId,
contracts.AssociationStatusFailed,
contracts.AssociationErrorCodeInvalidExpression,
times.ToIso8601UTC(time.Now()),
message,
service.NoOutputUrl)
p.complianceUploader.UpdateAssociationCompliance(
*assoc.Association.AssociationId,
*assoc.Association.InstanceId,
*assoc.Association.Name,
*assoc.Association.DocumentVersion,
contracts.AssociationStatusFailed,
time.Now().UTC())
continue
}
}
}
schedulemanager.Refresh(log, associations)
log.Debug("ProcessAssociation is triggering execution")
signal.ExecuteAssociation(log)
log.Debug("ProcessAssociation completed")
}