schedule/schedule.go (170 lines of code) (raw):

// Copyright 2016 Netflix, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package schedule implements a schedule of terminations package schedule import ( "bytes" "encoding/json" "fmt" "log" "math/rand" "sort" "time" "github.com/Netflix/chaosmonkey/v2" "github.com/Netflix/chaosmonkey/v2/config" "github.com/Netflix/chaosmonkey/v2/deploy" "github.com/Netflix/chaosmonkey/v2/grp" ) // Populate populates the termination schedule with the random // terminations for a list of apps. If the specified list of apps is empty, // then it will func (s *Schedule) Populate(d deploy.Deployment, getter chaosmonkey.AppConfigGetter, chaosConfig *config.Monkey, apps []string) error { c := make(chan *deploy.App) // If the caller explicitly a set of apps, use those // If they did not, do all apps if len(apps) == 0 { var err error apps, err = d.AppNames() if err != nil { return fmt.Errorf("could not retrieve list of apps: %v", err) } } go d.Apps(c, apps) i := 0 // number of apps already processed for app := range c { if i >= chaosConfig.MaxApps() { break } i++ cfg, err := getter.Get(app.Name()) if err != nil { log.Printf("WARNING: Could not retrieve config for app=%s. %s", app.Name(), err) continue } doScheduleApp(s, app, *cfg, chaosConfig) } return nil } // Add schedules a termination for group at time tm func (s *Schedule) Add(tm time.Time, group grp.InstanceGroup) { s.entries = append(s.entries, Entry{Group: group, Time: tm}) } // Entries returns the list of schedule entries func (s *Schedule) Entries() []Entry { return s.entries } // doScheduleApp populates the termination schedule for one app func doScheduleApp(schedule *Schedule, app *deploy.App, cfg chaosmonkey.AppConfig, chaosConfig *config.Monkey) { if !cfg.Enabled { log.Printf("app=%s disabled\n", app.Name()) return } r := rand.New(rand.NewSource(time.Now().UnixNano())) startHour := chaosConfig.StartHour() endHour := chaosConfig.EndHour() location, err := chaosConfig.Location() if err != nil { panic(fmt.Sprintf("Could not get Location for time zone calculation: %s", err.Error())) } groups := app.EligibleInstanceGroups(cfg) if len(groups) == 0 { log.Printf("app=%s no eligible instance groups", app.Name()) } for _, group := range groups { kill := shouldKillInstance(cfg.MeanTimeBetweenKillsInWorkDays, r) log.Printf("%s mtbk=%d kill=%t\n", grp.String(group), cfg.MeanTimeBetweenKillsInWorkDays, kill) if kill { time := chooseTerminationTime(time.Now(), startHour, endHour, location) schedule.Add(time, group) } } } // chooseTerminationTime Randomly selects a time to terminate an instance // on the same date as now, between startHour:00 and endHour:00 in the same // timezone as location // Panics if endHour <= startHour // // Note that there is no guarantee that the selected termination time will be in // the future // // now is passed as an argument to simplify testing func chooseTerminationTime(now time.Time, startHour int, endHour int, location *time.Location) time.Time { if endHour <= startHour { panic(fmt.Sprintf("ChooseTermination called with startHour <= endHour, startHour: %d. endHour: %d", startHour, endHour)) } // Compute the number of minutes in the interval between start and end, // pick a random one in there, and then add it to the start time as an // offset minutesInTimeInterval := (endHour - startHour) * 60 r := rand.New(rand.NewSource(time.Now().UnixNano())) sample := r.Intn(minutesInTimeInterval) // Convert the sample to duration in minutes offset := time.Duration(sample) * time.Minute year, month, day := now.Date() startTime := time.Date(year, month, day, startHour, 0, 0, 0, location) return startTime.Add(offset) } // float64Rand generates random floats on [0, 1) type float64Rand interface { // Return a random float64 on [0, 1) Float64() float64 } // ShouldKillInstance randomly determines whether an instance should // be terminated today by flipping a biased coin. // // It uses the meanTimeBetweenKillsInWorkDays to determine the probability // of a kill func shouldKillInstance(meanTimeBetweenKillsInWorkDays int, r float64Rand) bool { if meanTimeBetweenKillsInWorkDays <= 0 { panic("meanTimeBetweenKillsInWorkDays is zero or negative") } var pkill = 1.0 / float64(meanTimeBetweenKillsInWorkDays) // Sample uniformly over [0,1) sample := r.Float64() return pkill >= sample } // Entry is an entry a termination schedule. // It contains the instance group that the terminator will randomly select from // as well as the time of termination. type Entry struct { Group grp.InstanceGroup `json:"group"` Time time.Time `json:"time"` } // apiGroup represents group representation passed by the API type apiGroup struct { App, Account, Region, Stack, Cluster string } // UnmarshalJSON implements Unmarshaler.UnmarshalJSON func (e *Entry) UnmarshalJSON(b []byte) (err error) { var ce struct { Group apiGroup Time time.Time } err = json.Unmarshal(b, &ce) if err != nil { return err } g := &ce.Group e.Group = grp.New(g.App, g.Account, g.Region, g.Stack, g.Cluster) e.Time = ce.Time return nil } // Equal checks that two entries are equal func (e *Entry) Equal(o *Entry) bool { return grp.Equal(e.Group, o.Group) && e.Time.Equal(o.Time) } // Crontab returns a termination command for the Entry, in crontab format. // It takes as arguments: // - the path to the termination executable // - the account that should execute the job // // The returned string is not terminated by a newline. func (e *Entry) Crontab(termPath, account string) string { // From https://en.wikipedia.org/wiki/Cron // # * * * * * account command to execute // # │ │ │ │ │ // # │ │ │ │ │ // # │ │ │ │ └───── day of week (0 - 6) (0 to 6 are Sunday to Saturday, or use names; 7 is Sunday, the same as 0) // # │ │ │ └────────── month (1 - 12) // # │ │ └─────────────── day of month (1 - 31) // # │ └──────────────────── hour (0 - 23) // # └───────────────────────── min (0 - 59) t := e.Time.UTC() return fmt.Sprintf("%d %d %d %d %d %s %s", t.Minute(), t.Hour(), t.Day(), t.Month(), t.Weekday(), account, terminateCommand(termPath, e.Group)) } // terminateCommand returns the string for terminating an instance // given the path to the chaosmonkey termination executable and an instance to terminate func terminateCommand(termPath string, group grp.InstanceGroup) string { cmd := fmt.Sprintf("%s %s %s", termPath, group.App(), group.Account()) if cluster, ok := group.Cluster(); ok { cmd = fmt.Sprintf("%s --cluster=%s", cmd, cluster) } if stack, ok := group.Stack(); ok { cmd = fmt.Sprintf("%s --stack=%s", cmd, stack) } if region, ok := group.Region(); ok { cmd = fmt.Sprintf("%s --region=%s", cmd, region) } return cmd } // logRedirect returns a string to append to a shell command so it redirects // stdout and stderr to a logfile // Example output: ">> /path/to/log 2>&1" func logRedirect(logPath string) string { return fmt.Sprintf(">> %s 2>&1", logPath) } // Schedule is a collection of termination entries. type Schedule struct { entries []Entry } // New returns a new Schedule func New() *Schedule { return &Schedule{ // We need a zero-element slice instead of a nil slice so that // it will JSON-marshall into '[ ]' instead of 'null' make([]Entry, 0), } } // ByTime implements sort.Interface for []Entry based on the time field type ByTime []Entry func (t ByTime) Len() int { return len(t) } func (t ByTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t ByTime) Less(i, j int) bool { return t[i].Time.Before(t[j].Time) } // Crontab returns a schedule of termination commands in crontab format // It takes as arguments: // - the path to the executable that terminates an instance // - the account that should execute the job func (s Schedule) Crontab(exPath string, account string) []byte { var result bytes.Buffer // In-place sort the entries before generating the table sort.Sort(ByTime(s.entries)) for _, entry := range s.entries { _, err := result.WriteString(entry.Crontab(exPath, account)) if err != nil { panic(fmt.Sprintf("Could not generate string with crontab: %s", err.Error())) } _, err = result.WriteString("\n") if err != nil { panic(fmt.Sprintf("Could not generate string with crontab: %s", err.Error())) } } return result.Bytes() } // MarshalJSON implements Marshaler.MarshalJSON func (s Schedule) MarshalJSON() ([]byte, error) { return json.Marshal(s.entries) } // UnmarshalJSON implements Unmarshaler.UnmarshalJSON func (s *Schedule) UnmarshalJSON(b []byte) (err error) { return json.Unmarshal(b, &s.entries) }