pipeline/endpoints/disk.go (145 lines of code) (raw):

// Copyright 2017 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package endpoints import ( "encoding/json" "io/ioutil" "os" "path" "path/filepath" "strings" "sync" "time" "github.com/GoogleCloudPlatform/ubbagent/clock" "github.com/GoogleCloudPlatform/ubbagent/metrics" "github.com/GoogleCloudPlatform/ubbagent/pipeline" "github.com/golang/glog" ) const ( fileMode = 0644 directoryMode = 0755 cleanupInterval = 1 * time.Minute reportPrefix = "report" reportSuffix = ".json" randomLength = 5 ) type DiskEndpoint struct { name string path string expiration time.Duration quit chan bool closeOnce sync.Once clock clock.Clock wait sync.WaitGroup tracker pipeline.UsageTracker closed bool // used for testing } type diskContext struct { Name string } // NewDiskEndpoint creates a new DiskEndpoint and starts a goroutine that cleans up expired reports // on disk. func NewDiskEndpoint(name string, path string, expiration time.Duration) *DiskEndpoint { return newDiskEndpoint(name, path, expiration, clock.NewClock()) } func newDiskEndpoint(name string, path string, expiration time.Duration, clock clock.Clock) *DiskEndpoint { ep := &DiskEndpoint{ name: name, path: path, expiration: expiration, clock: clock, quit: make(chan bool, 1), } ep.wait.Add(1) go ep.run(clock.Now()) return ep } func (ep *DiskEndpoint) Name() string { return ep.name } func (ep *DiskEndpoint) BuildReport(r metrics.StampedMetricReport) (pipeline.EndpointReport, error) { return pipeline.NewEndpointReport(r, diskContext{Name: reportName(r, ep.clock.Now())}) } func (ep *DiskEndpoint) Send(r pipeline.EndpointReport) error { dctx := diskContext{} err := r.UnmarshalContext(&dctx) if err != nil { return err } jsontext, err := json.Marshal(r.StampedMetricReport) if err != nil { return err } if err := os.MkdirAll(ep.path, directoryMode); err != nil { return err } file := path.Join(ep.path, dctx.Name) if err := ioutil.WriteFile(file, jsontext, fileMode); err != nil { return err } return nil } // Use increments the DiskEndpoint's usage count. // See pipeline.Component.Use. func (ep *DiskEndpoint) Use() { ep.tracker.Use() } // Release decrements the DiskEndpoint's usage count. If it reaches 0, Release instructs the // DiskEndpoint's cleanup goroutine to gracefully shutdown. It blocks until the operation has // completed. // See pipeline.Component.Release. func (ep *DiskEndpoint) Release() error { return ep.tracker.Release(func() error { ep.closeOnce.Do(func() { ep.quit <- true ep.closed = true }) ep.wait.Wait() return nil }) } func (ep *DiskEndpoint) run(start time.Time) { nextFire := start.Add(cleanupInterval) for { t := ep.clock.NewTimerAt(nextFire) select { case <-t.GetC(): ep.cleanup() case <-ep.quit: ep.wait.Done() return } t.Stop() nextFire = nextFire.Add(cleanupInterval) } } func (ep *DiskEndpoint) cleanup() { // compute time before which files are expired. cutoff := ep.clock.Now().Add(-ep.expiration) files, _ := ioutil.ReadDir(ep.path) for _, f := range files { if isExpired(f.Name(), cutoff) { if err := os.Remove(filepath.Join(ep.path, f.Name())); err != nil { glog.Warningf("error removing expired disk report: %v", f) } } } } func reportName(report metrics.StampedMetricReport, reportTime time.Time) string { var random string if len(report.Id) < randomLength { random = report.Id } else { random = report.Id[0:5] } return reportPrefix + "_" + reportTime.UTC().Format(time.RFC3339) + "_" + random + reportSuffix } func isExpired(name string, cutoff time.Time) bool { if !strings.HasPrefix(name, reportPrefix) { return false } if !strings.HasSuffix(name, reportSuffix) { return false } parts := strings.Split(name, "_") if len(parts) != 3 { return false } t, err := time.Parse(time.RFC3339, parts[1]) if err != nil { return false } return t.Before(cutoff) } func (ep *DiskEndpoint) IsTransient(err error) bool { return true }