pulsaradmin/pkg/utils/functions_stats.go (80 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package utils type FunctionStats struct { // Overall total number of records function received from source ReceivedTotal int64 `json:"receivedTotal"` // Overall total number of records successfully processed by user function ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` // Overall total number of system exceptions thrown SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` // Overall total number of user exceptions thrown UserExceptionsTotal int64 `json:"userExceptionsTotal"` // Average process latency for function AvgProcessLatency float64 `json:"avgProcessLatency"` // Timestamp of when the function was last invoked by any instance LastInvocation int64 `json:"lastInvocation"` OneMin FunctionInstanceStatsDataBase `json:"oneMin"` Instances []FunctionInstanceStats `json:"instances"` FunctionInstanceStats } type FunctionInstanceStats struct { FunctionInstanceStatsDataBase InstanceID int64 `json:"instanceId"` Metrics FunctionInstanceStatsData `json:"metrics"` } type FunctionInstanceStatsDataBase struct { // Total number of records function received from source for instance ReceivedTotal int64 `json:"receivedTotal"` // Total number of records successfully processed by user function for instance ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` // Total number of system exceptions thrown for instance SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` // Total number of user exceptions thrown for instance UserExceptionsTotal int64 `json:"userExceptionsTotal"` // Average process latency for function for instance AvgProcessLatency float64 `json:"avgProcessLatency"` } type FunctionInstanceStatsData struct { OneMin FunctionInstanceStatsDataBase `json:"oneMin"` // Timestamp of when the function was last invoked for instance LastInvocation int64 `json:"lastInvocation"` // Map of user defined metrics UserMetrics map[string]float64 `json:"userMetrics"` FunctionInstanceStatsDataBase } func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats) { fs.Instances = append(fs.Instances, functionInstanceStats) } func (fs *FunctionStats) CalculateOverall() *FunctionStats { var ( nonNullInstances int nonNullInstancesOneMin int ) for _, functionInstanceStats := range fs.Instances { functionInstanceStatsData := functionInstanceStats.Metrics fs.ReceivedTotal += functionInstanceStatsData.ReceivedTotal fs.ProcessedSuccessfullyTotal += functionInstanceStatsData.ProcessedSuccessfullyTotal fs.SystemExceptionsTotal += functionInstanceStatsData.SystemExceptionsTotal fs.UserExceptionsTotal += functionInstanceStatsData.UserExceptionsTotal if functionInstanceStatsData.AvgProcessLatency != 0 { if fs.AvgProcessLatency == 0 { fs.AvgProcessLatency = 0.0 } fs.AvgProcessLatency += functionInstanceStatsData.AvgProcessLatency nonNullInstances++ } fs.OneMin.ReceivedTotal += functionInstanceStatsData.OneMin.ReceivedTotal fs.OneMin.ProcessedSuccessfullyTotal += functionInstanceStatsData.OneMin.ProcessedSuccessfullyTotal fs.OneMin.SystemExceptionsTotal += functionInstanceStatsData.OneMin.SystemExceptionsTotal fs.OneMin.UserExceptionsTotal += functionInstanceStatsData.OneMin.UserExceptionsTotal if functionInstanceStatsData.OneMin.AvgProcessLatency != 0 { if fs.OneMin.AvgProcessLatency == 0 { fs.OneMin.AvgProcessLatency = 0.0 } fs.OneMin.AvgProcessLatency += functionInstanceStatsData.OneMin.AvgProcessLatency nonNullInstancesOneMin++ } if functionInstanceStatsData.LastInvocation != 0 { if fs.LastInvocation == 0 || functionInstanceStatsData.LastInvocation > fs.LastInvocation { fs.LastInvocation = functionInstanceStatsData.LastInvocation } } } // calculate average from sum if nonNullInstances > 0 { fs.AvgProcessLatency /= float64(nonNullInstances) } else { fs.AvgProcessLatency = 0 } // calculate 1min average from sum if nonNullInstancesOneMin > 0 { fs.OneMin.AvgProcessLatency /= float64(nonNullInstancesOneMin) } else { fs.AvgProcessLatency = 0 } return fs }