ste/xferStatsPolicy.go (147 lines of code) (raw):

// Copyright © Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "bytes" "context" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-storage-azcopy/v10/common" "io" "net/http" "strings" "sync/atomic" "time" ) type PipelineNetworkStats struct { atomicOperationCount int64 atomicNetworkErrorCount int64 atomic503CountThroughput int64 atomic503CountIOPS int64 atomic503CountUnknown int64 // counts 503's when we don't know the reason atomicE2ETotalMilliseconds int64 // should this be nanoseconds? Not really needed, given typical minimum operation lengths that we observe atomicStartSeconds int64 nocopy common.NoCopy tunerInterface ConcurrencyTuner } func newPipelineNetworkStats(tunerInterface ConcurrencyTuner) *PipelineNetworkStats { s := &PipelineNetworkStats{tunerInterface: tunerInterface} atomic.StoreInt64(&s.atomicStartSeconds, time.Now().Unix()) return s } func (s *PipelineNetworkStats) getStartSeconds() int64 { return atomic.LoadInt64(&s.atomicStartSeconds) } func (s *PipelineNetworkStats) recordRetry(responseBody string) { if strings.Contains(responseBody, "gress is over the account limit") { // maybe Ingress or Egress atomic.AddInt64(&s.atomic503CountThroughput, 1) } else if strings.Contains(responseBody, "Operations per second is over the account limit") { atomic.AddInt64(&s.atomic503CountIOPS, 1) } else { atomic.AddInt64(&s.atomic503CountUnknown, 1) // we don't know what caused this 503 (that can happen) } } func (s *PipelineNetworkStats) OperationsPerSecond() int { s.nocopy.Check() elapsed := time.Since(time.Unix(s.getStartSeconds(), 0)).Seconds() if elapsed > 0 { return int(float64(atomic.LoadInt64(&s.atomicOperationCount)) / elapsed) } else { return 0 } } func (s *PipelineNetworkStats) NetworkErrorPercentage() float32 { s.nocopy.Check() ops := float32(atomic.LoadInt64(&s.atomicOperationCount)) if ops > 0 { return 100 * float32(atomic.LoadInt64(&s.atomicNetworkErrorCount)) / ops } else { return 0 } } func (s *PipelineNetworkStats) TotalServerBusyPercentage() float32 { s.nocopy.Check() ops := float32(atomic.LoadInt64(&s.atomicOperationCount)) if ops > 0 { return 100 * float32(atomic.LoadInt64(&s.atomic503CountThroughput)+ atomic.LoadInt64(&s.atomic503CountIOPS)+ atomic.LoadInt64(&s.atomic503CountUnknown)) / ops } else { return 0 } } func (s *PipelineNetworkStats) GetTotalRetries() int64 { s.nocopy.Check() return atomic.LoadInt64(&s.atomic503CountThroughput) + atomic.LoadInt64(&s.atomic503CountIOPS) + atomic.LoadInt64(&s.atomic503CountUnknown) } func (s *PipelineNetworkStats) IOPSServerBusyPercentage() float32 { s.nocopy.Check() ops := float32(atomic.LoadInt64(&s.atomicOperationCount)) if ops > 0 { return 100 * float32(atomic.LoadInt64(&s.atomic503CountIOPS)) / ops } else { return 0 } } func (s *PipelineNetworkStats) ThroughputServerBusyPercentage() float32 { s.nocopy.Check() ops := float32(atomic.LoadInt64(&s.atomicOperationCount)) if ops > 0 { return 100 * float32(atomic.LoadInt64(&s.atomic503CountThroughput)) / ops } else { return 0 } } func (s *PipelineNetworkStats) OtherServerBusyPercentage() float32 { s.nocopy.Check() ops := float32(atomic.LoadInt64(&s.atomicOperationCount)) if ops > 0 { return 100 * float32(atomic.LoadInt64(&s.atomic503CountUnknown)) / ops } else { return 0 } } func (s *PipelineNetworkStats) AverageE2EMilliseconds() int { s.nocopy.Check() ops := atomic.LoadInt64(&s.atomicOperationCount) if ops > 0 { return int(atomic.LoadInt64(&s.atomicE2ETotalMilliseconds) / ops) } else { return 0 } } // transparentlyReadBody reads the response body, and then (because body is read-once-only) replaces it with // a new body that will return the same content to anyone else who reads it. // This looks like a fairly common approach in Go, e.g. https://stackoverflow.com/a/23077519 // Our implementation here returns a string, so is only sensible for bodies that we know to be short - e.g. bodies of error responses. func transparentlyReadBody(r *http.Response) string { if r.Body == http.NoBody { return "" } buf, _ := io.ReadAll(r.Body) // error responses are short fragments of XML, so safe to read all _ = r.Body.Close() // must close the real body r.Body = io.NopCloser(bytes.NewReader(buf)) // replace it with something that will read the same data we just read return string(buf) // copy to string } var pipelineNetworkStatsContextKey = contextKey{"pipelineNetworkStats"} // withPipelineNetworkStats returns a context that contains a pipeline network stats. The retryNotificationPolicy // will then invoke the pipeline network stats object when necessary func withPipelineNetworkStats(ctx context.Context, stats *PipelineNetworkStats) context.Context { return context.WithValue(ctx, pipelineNetworkStatsContextKey, stats) } type statsPolicy struct { } func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) { start := time.Now() response, err := req.Next() // Grab the notification callback out of the context and, if its there, call it stats, ok := req.Raw().Context().Value(pipelineNetworkStatsContextKey).(*PipelineNetworkStats) if ok && stats != nil { atomic.AddInt64(&stats.atomicOperationCount, 1) atomic.AddInt64(&stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000)) if err != nil && !isContextCancelledError(err) { // no response from server atomic.AddInt64(&stats.atomicNetworkErrorCount, 1) } // always look at retries, even if not started, because concurrency tuner needs to know about them // TODO should we also count status 500? It is mentioned here as timeout:https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets if response != nil && response.StatusCode == http.StatusServiceUnavailable { stats.tunerInterface.recordRetry() // always tell the tuner // To find out why the server was busy we need to look at the response responseBodyText := transparentlyReadBody(response) stats.recordRetry(responseBodyText) } } return response, err } func newStatsPolicy() policy.Policy { return statsPolicy{} }