agentendpoint/config_task.go (384 lines of code) (raw):

// Copyright 2019 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package agentendpoint import ( "context" "fmt" "os" "path/filepath" "time" "github.com/GoogleCloudPlatform/osconfig/agentconfig" "github.com/GoogleCloudPlatform/osconfig/clog" "github.com/GoogleCloudPlatform/osconfig/config" "github.com/GoogleCloudPlatform/osconfig/pretty" "cloud.google.com/go/osconfig/agentendpoint/apiv1/agentendpointpb" ) // This is the maximum size in bytes of the OSPolicyResourceConfigStep ErrorMessage field. // Any message longer than this will be truncated to at max this length. const maxErrorMessage = 512 var newResource = func(r *agentendpointpb.OSPolicy_Resource) *resource { return &resource{resourceIface: resourceIface(&config.OSPolicyResource{OSPolicy_Resource: r})} } var repoFormats = []string{agentconfig.AptRepoFormat(), agentconfig.YumRepoFormat(), agentconfig.ZypperRepoFormat(), agentconfig.GooGetRepoFormat()} type configTask struct { StartedAt time.Time `json:",omitempty"` client *Client lastProgressState map[agentendpointpb.ApplyConfigTaskProgress_State]time.Time Task *applyConfigTask policies map[string]*policy TaskID string results []*agentendpointpb.ApplyConfigTaskOutput_OSPolicyResult managedResources []*config.ManagedResources } type applyConfigTask struct { *agentendpointpb.ApplyConfigTask } type policy struct { resources map[string]*resource } type resource struct { resourceIface needsPostCheck bool validateOrCheckError bool } type resourceIface interface { Validate(context.Context) error CheckState(context.Context) error EnforceState(context.Context) error PopulateOutput(*agentendpointpb.OSPolicyResourceCompliance) error Cleanup(context.Context) error InDesiredState() bool ManagedResources() *config.ManagedResources } func (c *configTask) reportCompletedState(ctx context.Context, errMsg string, state agentendpointpb.ApplyConfigTaskOutput_State) error { req := &agentendpointpb.ReportTaskCompleteRequest{ TaskId: c.TaskID, TaskType: agentendpointpb.TaskType_APPLY_CONFIG_TASK, ErrorMessage: errMsg, Output: &agentendpointpb.ReportTaskCompleteRequest_ApplyConfigTaskOutput{ ApplyConfigTaskOutput: &agentendpointpb.ApplyConfigTaskOutput{State: state, OsPolicyResults: c.results}, }, } if err := c.client.reportTaskComplete(ctx, req); err != nil { return fmt.Errorf("error reporting completed state: %v", err) } return nil } func (c *configTask) handleErrorState(ctx context.Context, msg string, err error) error { if err == errServerCancel { clog.Infof(ctx, "Cancelling config run: %v", errServerCancel) return c.reportCompletedState(ctx, errServerCancel.Error(), agentendpointpb.ApplyConfigTaskOutput_CANCELLED) } msg = fmt.Sprintf("%s: %v", msg, err) clog.Errorf(ctx, "%v", msg) return c.reportCompletedState(ctx, msg, agentendpointpb.ApplyConfigTaskOutput_FAILED) } func (c *configTask) reportContinuingState(ctx context.Context, configState agentendpointpb.ApplyConfigTaskProgress_State) error { st, ok := c.lastProgressState[configState] if ok && st.After(time.Now().Add(sameStateTimeWindow)) { // Don't resend the same state more than once every 5s. return nil } req := &agentendpointpb.ReportTaskProgressRequest{ TaskId: c.TaskID, TaskType: agentendpointpb.TaskType_APPLY_CONFIG_TASK, Progress: &agentendpointpb.ReportTaskProgressRequest_ApplyConfigTaskProgress{ ApplyConfigTaskProgress: &agentendpointpb.ApplyConfigTaskProgress{State: configState}, }, } res, err := c.client.reportTaskProgress(ctx, req) if err != nil { return fmt.Errorf("error reporting task progress %s: %v", configState, err) } if res.GetTaskDirective() == agentendpointpb.TaskDirective_STOP { return errServerCancel } if c.lastProgressState == nil { c.lastProgressState = make(map[agentendpointpb.ApplyConfigTaskProgress_State]time.Time) } c.lastProgressState[configState] = time.Now() return nil } // detectPolicyConflicts checks for managed resource conflicts between a proposed // OSPolicyResource and all other OSPolcyResources up to this point, adding to the // current set of ManagedResources. func detectPolicyConflicts(proposed, current *config.ManagedResources) error { // TODO: implement return nil } func truncateMessage(msg string, size int) string { cut := size / 2 if len(msg) > size { return msg[:size-(cut+3)] + "..." + msg[len(msg)-cut:] } return msg } func validateConfigResource(ctx context.Context, res *resource, policyMR *config.ManagedResources, rCompliance *agentendpointpb.OSPolicyResourceCompliance, configResource *agentendpointpb.OSPolicy_Resource) (hasError bool) { ctx = clog.WithLabels(ctx, map[string]string{"resource_id": configResource.GetId()}) clog.Debugf(ctx, "Running step 'validate' on resource %q.", configResource.GetId()) var errMessage string outcome := agentendpointpb.OSPolicyResourceConfigStep_SUCCEEDED if err := res.Validate(ctx); err != nil { outcome = agentendpointpb.OSPolicyResourceConfigStep_FAILED hasError = true errMessage = truncateMessage(fmt.Sprintf("Validate: resource %q error: %v", configResource.GetId(), err), maxErrorMessage) clog.Errorf(ctx, "%v", errMessage) } else { // Detect any resource conflicts within this policy. if err := detectPolicyConflicts(res.ManagedResources(), policyMR); err != nil { outcome = agentendpointpb.OSPolicyResourceConfigStep_FAILED hasError = true errMessage = truncateMessage(fmt.Sprintf("Validate: resource conflict in policy: %v", err), maxErrorMessage) clog.Errorf(ctx, "%v", errMessage) } else { clog.Infof(ctx, "Validate: resource %q validation successful.", configResource.GetId()) } } rCompliance.ConfigSteps = append(rCompliance.GetConfigSteps(), &agentendpointpb.OSPolicyResourceConfigStep{ Type: agentendpointpb.OSPolicyResourceConfigStep_VALIDATION, Outcome: outcome, ErrorMessage: errMessage, }) rCompliance.State = agentendpointpb.OSPolicyComplianceState_UNKNOWN return hasError } func checkConfigResourceState(ctx context.Context, res *resource, rCompliance *agentendpointpb.OSPolicyResourceCompliance, configResource *agentendpointpb.OSPolicy_Resource) (hasError bool) { ctx = clog.WithLabels(ctx, map[string]string{"resource_id": configResource.GetId()}) clog.Debugf(ctx, "Running step 'check state' on resource %q.", configResource.GetId()) var errMessage string outcome := agentendpointpb.OSPolicyResourceConfigStep_SUCCEEDED state := agentendpointpb.OSPolicyComplianceState_UNKNOWN err := res.CheckState(ctx) if err != nil { outcome = agentendpointpb.OSPolicyResourceConfigStep_FAILED hasError = true errMessage = truncateMessage(fmt.Sprintf("Check state: resource %q error: %v", configResource.GetId(), err), maxErrorMessage) clog.Errorf(ctx, "%v", errMessage) } else if res.InDesiredState() { state = agentendpointpb.OSPolicyComplianceState_COMPLIANT } else { state = agentendpointpb.OSPolicyComplianceState_NON_COMPLIANT } rCompliance.ConfigSteps = append(rCompliance.GetConfigSteps(), &agentendpointpb.OSPolicyResourceConfigStep{ Type: agentendpointpb.OSPolicyResourceConfigStep_DESIRED_STATE_CHECK, Outcome: outcome, ErrorMessage: errMessage, }) clog.Infof(ctx, "Check state: resource %q state is %s.", configResource.GetId(), state) rCompliance.State = state return hasError } func enforceConfigResourceState(ctx context.Context, res *resource, rCompliance *agentendpointpb.OSPolicyResourceCompliance, configResource *agentendpointpb.OSPolicy_Resource) (enforcementActionTaken, hasError bool) { ctx = clog.WithLabels(ctx, map[string]string{"resource_id": configResource.GetId()}) clog.Debugf(ctx, "Running step 'enforce state' on resource %q.", configResource.GetId()) // Only enforce resources that need it. if res.InDesiredState() { clog.Debugf(ctx, "Enforce state: No enforcement required for %q.", configResource.GetId()) return false, false } var errMessage string outcome := agentendpointpb.OSPolicyResourceConfigStep_SUCCEEDED err := res.EnforceState(ctx) if err != nil { outcome = agentendpointpb.OSPolicyResourceConfigStep_FAILED hasError = true errMessage = truncateMessage(fmt.Sprintf("Enforce state: resource %q error: %v", configResource.GetId(), err), maxErrorMessage) clog.Errorf(ctx, "%v", errMessage) } else { clog.Infof(ctx, "Enforce state: resource %q enforcement successful.", configResource.GetId()) } rCompliance.ConfigSteps = append(rCompliance.GetConfigSteps(), &agentendpointpb.OSPolicyResourceConfigStep{ Type: agentendpointpb.OSPolicyResourceConfigStep_DESIRED_STATE_ENFORCEMENT, Outcome: outcome, ErrorMessage: errMessage, }) // Resource is always in an unknown state after enforcement is run. // A COMPLIANT state will only happen after a post check. rCompliance.State = agentendpointpb.OSPolicyComplianceState_UNKNOWN return true, hasError } func postCheckConfigResourceState(ctx context.Context, res *resource, rCompliance *agentendpointpb.OSPolicyResourceCompliance, configResource *agentendpointpb.OSPolicy_Resource) { ctx = clog.WithLabels(ctx, map[string]string{"resource_id": configResource.GetId()}) clog.Debugf(ctx, "Running step 'check state post enforcement' on resource %q.", configResource.GetId()) if !res.needsPostCheck { clog.Debugf(ctx, "Check state post enforcement: no post check required for %q.", configResource.GetId()) return } var errMessage string outcome := agentendpointpb.OSPolicyResourceConfigStep_SUCCEEDED state := agentendpointpb.OSPolicyComplianceState_UNKNOWN err := res.CheckState(ctx) if err != nil { outcome = agentendpointpb.OSPolicyResourceConfigStep_FAILED errMessage = truncateMessage(fmt.Sprintf("Check state post enforcement: resource %q error: %v", configResource.GetId(), err), maxErrorMessage) clog.Errorf(ctx, "%v", errMessage) } else if res.InDesiredState() { state = agentendpointpb.OSPolicyComplianceState_COMPLIANT } else { state = agentendpointpb.OSPolicyComplianceState_NON_COMPLIANT } rCompliance.ConfigSteps = append(rCompliance.GetConfigSteps(), &agentendpointpb.OSPolicyResourceConfigStep{ Type: agentendpointpb.OSPolicyResourceConfigStep_DESIRED_STATE_CHECK_POST_ENFORCEMENT, Outcome: outcome, ErrorMessage: errMessage, }) clog.Infof(ctx, "Check state post enforcement: resource %q state is %s.", configResource.GetId(), state) rCompliance.State = state } func (c *configTask) postCheckState(ctx context.Context) { // Actually run post check state (for policies that do not have a previous error). // No prepopulate run for post check as we will always check every resource. for i, osPolicy := range c.Task.GetOsPolicies() { ctx := clog.WithLabels(ctx, map[string]string{"os_policy_assignment": osPolicy.GetOsPolicyAssignment(), "os_policy_id": osPolicy.GetId()}) plcy, ok := c.policies[osPolicy.GetId()] // This should not happen in the normal code flow since we only run postCheckState after // all policies have been evaluated. if !ok { clog.Errorf(ctx, "Unexpected Error: policy entry for %q is empty.", osPolicy.GetId()) continue } pResult := c.results[i] for i, configResource := range osPolicy.GetResources() { res, ok := plcy.resources[configResource.GetId()] // This should only happen if there was a previous resource with a validate or check state error. if !ok || res == nil { continue } rCompliance := pResult.GetOsPolicyResourceCompliances()[i] postCheckConfigResourceState(ctx, res, rCompliance, configResource) clog.Infof(ctx, "Policy %q resource %q state: %s", osPolicy.GetId(), configResource.GetId(), rCompliance.GetState()) } } return } func (c *configTask) generateBaseResults() { c.results = make([]*agentendpointpb.ApplyConfigTaskOutput_OSPolicyResult, len(c.Task.GetOsPolicies())) for i, p := range c.Task.GetOsPolicies() { pResult := &agentendpointpb.ApplyConfigTaskOutput_OSPolicyResult{ OsPolicyId: p.GetId(), OsPolicyAssignment: p.GetOsPolicyAssignment(), OsPolicyResourceCompliances: make([]*agentendpointpb.OSPolicyResourceCompliance, len(p.GetResources())), } c.results[i] = pResult for i, r := range p.GetResources() { pResult.GetOsPolicyResourceCompliances()[i] = &agentendpointpb.OSPolicyResourceCompliance{ OsPolicyResourceId: r.GetId(), } } } } func removeFileIfNoMatch(a string, s []string) error { for _, b := range s { if a == b { return nil } } return os.Remove(a) } func (c *configTask) cleanupRepos(ctx context.Context) { var managedRepos []string for _, managedResource := range c.managedResources { if managedResource == nil { continue } for _, managedRepo := range managedResource.Repositories { managedRepos = append(managedRepos, managedRepo.RepoFilePath) } } for _, format := range repoFormats { matches, err := filepath.Glob(fmt.Sprintf(format, "*")) if err != nil { clog.Errorf(ctx, "Error globing directory: %v", err) } for _, match := range matches { if err := removeFileIfNoMatch(match, managedRepos); err != nil { clog.Errorf(ctx, "Error cleaning up old repo: %v", err) } } } } func (c *configTask) cleanup(ctx context.Context) { // Skip repo clean-up, as there is no clear way how to find full list of repos, // repos might be distributed across many tasks and there is no source of truth to find full list. //c.cleanupRepos(ctx) // Cleanup any policy specific resources. for _, osPolicy := range c.Task.GetOsPolicies() { ctx := clog.WithLabels(ctx, map[string]string{"os_policy_assignment": osPolicy.GetOsPolicyAssignment(), "os_policy_id": osPolicy.GetId()}) plcy := c.policies[osPolicy.GetId()] for _, configResource := range osPolicy.GetResources() { ctx := clog.WithLabels(ctx, map[string]string{"resource_id": configResource.GetId()}) res, ok := plcy.resources[configResource.GetId()] if !ok || res == nil { continue } if err := res.Cleanup(ctx); err != nil { clog.Warningf(ctx, "Error running resource cleanup:%v", err) } } } } func (c *configTask) run(ctx context.Context) error { clog.Infof(ctx, "Beginning ApplyConfigTask.") clog.Debugf(ctx, "ApplyConfigTask:\n%s", pretty.Format(c.Task.ApplyConfigTask)) c.StartedAt = time.Now() rcsErrMsg := "Error reporting continuing state" if err := c.reportContinuingState(ctx, agentendpointpb.ApplyConfigTaskProgress_STARTED); err != nil { return c.handleErrorState(ctx, rcsErrMsg, err) } if len(c.Task.GetOsPolicies()) == 0 { clog.Infof(ctx, "No OSPolicies to apply.") return c.reportCompletedState(ctx, "", agentendpointpb.ApplyConfigTaskOutput_SUCCEEDED) } // We need to generate base results first thing, each execution step // just adds on. c.generateBaseResults() defer c.cleanup(ctx) if err := c.reportContinuingState(ctx, agentendpointpb.ApplyConfigTaskProgress_APPLYING_CONFIG); err != nil { return c.handleErrorState(ctx, rcsErrMsg, err) } c.policies = map[string]*policy{} for i, osPolicy := range c.Task.GetOsPolicies() { ctx := clog.WithLabels(ctx, map[string]string{"os_policy_assignment": osPolicy.GetOsPolicyAssignment(), "os_policy_id": osPolicy.GetId()}) clog.Infof(ctx, "Executing policy %q", osPolicy.GetId()) pResult := c.results[i] plcy := &policy{resources: map[string]*resource{}} c.policies[osPolicy.GetId()] = plcy var policyMR *config.ManagedResources var validateOnly bool if osPolicy.GetMode() == agentendpointpb.OSPolicy_VALIDATION { clog.Infof(ctx, "Policy running in VALIDATION mode, not running enforcement action for any resources.") validateOnly = true } for i, configResource := range osPolicy.GetResources() { rCompliance := pResult.GetOsPolicyResourceCompliances()[i] plcy.resources[configResource.GetId()] = newResource(configResource) res := plcy.resources[configResource.GetId()] if hasError := validateConfigResource(ctx, res, policyMR, rCompliance, configResource); hasError { res.validateOrCheckError = true break } if hasError := checkConfigResourceState(ctx, res, rCompliance, configResource); hasError { res.validateOrCheckError = true break } // Skip enforcement actions in VALIDATION mode. if validateOnly { continue } // Only errors in validate and check state constitute a serious error, // for enforce if any action is taken we still want to run post check. // We do however stop further execution of this polcy on enforce error. enforcementActionTaken, hasError := enforceConfigResourceState(ctx, res, rCompliance, configResource) if enforcementActionTaken { // On any change we trigger post check for all previous resouces, // even if there was an error. c.markPostCheckRequired() } // Still record output even if there was an error during enforcement. res.PopulateOutput(rCompliance) // Errors from enforcement are not classified as "serious" becasue we want post check to run for this resource. if hasError { break } } c.managedResources = append(c.managedResources, policyMR) } // Run any post checks that we need to. c.postCheckState(ctx) if err := c.reportCompletedState(ctx, "", agentendpointpb.ApplyConfigTaskOutput_SUCCEEDED); err != nil { return err } clog.Infof(ctx, "Successfully completed ApplyConfigTask") return nil } // Mark all resources that have already completed as "needs post check". func (c *configTask) markPostCheckRequired() { for _, osPolicy := range c.Task.GetOsPolicies() { plcy, ok := c.policies[osPolicy.GetId()] // This policy entry may not have been created yet by the loop in run(). if !ok { continue } for _, configResource := range osPolicy.GetResources() { res, ok := plcy.resources[configResource.GetId()] // This resource entry may not have been created yet is this polciy is in mid-run. // We take no actions for resources that had a failure with validation or precheck. if !ok || res == nil || res.validateOrCheckError { continue } res.needsPostCheck = true } } } // RunApplyConfig runs an ApplyConfigTask. func (c *Client) RunApplyConfig(ctx context.Context, task *agentendpointpb.Task) error { ctx = clog.WithLabels(ctx, task.GetServiceLabels()) e := &configTask{ TaskID: task.GetTaskId(), client: c, Task: &applyConfigTask{task.GetApplyConfigTask()}, } return e.run(ctx) }