func()

in agent/association/processor/processor.go [151:261]


func (p *Processor) ProcessAssociation() {
	log := p.context.Log()
	associations := []*model.InstanceAssociation{}

	log.Debug("running ProcessAssociation")

	instanceID, err := p.context.Identity().InstanceID()
	if err != nil {
		log.Error("Unable to retrieve instance id", err)
		return
	}

	p.assocSvc.CreateNewServiceIfUnHealthy(p.context)
	p.complianceUploader.CreateNewServiceIfUnHealthy(log)

	if associations, err = p.assocSvc.ListInstanceAssociations(log, instanceID); err != nil {
		log.Errorf("Unable to load instance associations, %v", err)
		return
	}

	// to account for any tag expansion delays on boot, call list associations again
	if p.onBoot {
		p.onBoot = false
		if len(associations) < 1 {
			log.Info("No associations on boot. Requerying for associations after 30 seconds.")
			time.Sleep(defaultRetryWaitOnBootInSeconds * time.Second)
			if associations, err = p.assocSvc.ListInstanceAssociations(log, instanceID); err != nil {
				log.Errorf("Unable to load instance associations, %v", err)
				return
			}
		}
	}

	// evict the invalid cache first
	for _, assoc := range associations {
		cache.ValidateCache(assoc)
	}

	// read from cache or load association details from service
	for _, assoc := range associations {
		var assocContent string
		if assocContent, err = jsonutil.Marshal(assoc); err != nil {
			return
		}

		log.Debug("Association content is \n", jsonutil.Indent(assocContent))

		//TODO: add retry for load association detail
		if err = p.assocSvc.LoadAssociationDetail(log, assoc); err != nil {
			err = fmt.Errorf("Encountered error while loading association %v contents, %v",
				*assoc.Association.AssociationId,
				err)
			log.Error(err)
			assoc.Errors = append(assoc.Errors, err)
			p.assocSvc.UpdateInstanceAssociationStatus(
				log,
				*assoc.Association.AssociationId,
				*assoc.Association.Name,
				*assoc.Association.InstanceId,
				contracts.AssociationStatusFailed,
				contracts.AssociationErrorCodeListAssociationError,
				times.ToIso8601UTC(time.Now()),
				err.Error(),
				service.NoOutputUrl)

			p.complianceUploader.UpdateAssociationCompliance(
				*assoc.Association.AssociationId,
				*assoc.Association.InstanceId,
				*assoc.Association.Name,
				*assoc.Association.DocumentVersion,
				contracts.AssociationStatusFailed,
				time.Now().UTC())
			continue
		}

		if !assoc.IsRunOnceAssociation() {
			if err = assoc.ParseExpression(log); err != nil {
				message := fmt.Sprintf("Encountered error while parsing expression for association %v", *assoc.Association.AssociationId)
				log.Errorf("%v, %v", message, err)
				assoc.Errors = append(assoc.Errors, err)
				p.assocSvc.UpdateInstanceAssociationStatus(
					log,
					*assoc.Association.AssociationId,
					*assoc.Association.Name,
					*assoc.Association.InstanceId,
					contracts.AssociationStatusFailed,
					contracts.AssociationErrorCodeInvalidExpression,
					times.ToIso8601UTC(time.Now()),
					message,
					service.NoOutputUrl)

				p.complianceUploader.UpdateAssociationCompliance(
					*assoc.Association.AssociationId,
					*assoc.Association.InstanceId,
					*assoc.Association.Name,
					*assoc.Association.DocumentVersion,
					contracts.AssociationStatusFailed,
					time.Now().UTC())
				continue
			}
		}
	}

	schedulemanager.Refresh(log, associations)

	log.Debug("ProcessAssociation is triggering execution")

	signal.ExecuteAssociation(log)

	log.Debug("ProcessAssociation completed")
}