pkg/webservice/handlers.go (802 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webservice
import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/log"
metrics2 "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/metrics/history"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
const PartitionDoesNotExists = "Partition not found"
const MissingParamsName = "Missing parameters"
const QueueDoesNotExists = "Queue not found"
const UserDoesNotExists = "User not found"
const GroupDoesNotExists = "Group not found"
const UserNameMissing = "User name is missing"
const GroupNameMissing = "Group name is missing"
const ApplicationDoesNotExists = "Application not found"
const NodeDoesNotExists = "Node not found"
func getStackInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var stack = func() []byte {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
return buf[:n]
}
buf = make([]byte, 2*len(buf))
}
}
if _, err := w.Write(stack()); err != nil {
log.Log(log.REST).Error("GetStackInfo error", zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
clustersInfo := getClusterDAO(lists)
if err := json.NewEncoder(w).Encode(clustersInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func validateQueue(queuePath string) error {
if queuePath != "" {
queueNameArr := strings.Split(queuePath, ".")
for _, name := range queueNameArr {
if !configs.QueueNameRegExp.MatchString(name) {
return fmt.Errorf("problem in queue query parameter parsing as queue param "+
"%s contains invalid queue name %s. Queue name must only have "+
"alphanumeric characters, - or _, and be no longer than 64 characters", queuePath, name)
}
}
}
return nil
}
func validateConf(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
requestBytes, err := io.ReadAll(r.Body)
if err == nil {
_, err = configs.LoadSchedulerConfigFromByteArray(requestBytes)
}
var result dao.ValidateConfResponse
if err != nil {
result.Allowed = false
result.Reason = err.Error()
} else {
result.Allowed = true
}
if err = json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func writeHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin")
}
func buildJSONErrorResponse(w http.ResponseWriter, detail string, code int) {
w.WriteHeader(code)
errorInfo := dao.NewYAPIError(nil, code, detail)
if jsonErr := json.NewEncoder(w).Encode(errorInfo); jsonErr != nil {
log.Log(log.REST).Error(fmt.Sprintf("Problem in sending error response in JSON format. Error response: %s", detail))
}
}
func getClusterJSON(partition *scheduler.PartitionContext) *dao.ClusterDAOInfo {
clusterInfo := &dao.ClusterDAOInfo{}
clusterInfo.StartTime = schedulerContext.GetStartTime().UnixNano()
rmInfo := schedulerContext.GetRMInfoMapClone()
clusterInfo.RMBuildInformation = getRMBuildInformation(rmInfo)
clusterInfo.PartitionName = common.GetPartitionNameWithoutClusterID(partition.Name)
clusterInfo.ClusterName = "kubernetes"
return clusterInfo
}
func getClusterUtilJSON(partition *scheduler.PartitionContext) []*dao.ClusterUtilDAOInfo {
var utils []*dao.ClusterUtilDAOInfo
var getResource bool = true
total := partition.GetTotalPartitionResource()
if resources.IsZero(total) {
getResource = false
}
used := partition.GetAllocatedResource()
if len(used.Resources) == 0 {
getResource = false
}
if getResource {
percent := resources.CalculateAbsUsedCapacity(total, used)
for name, value := range percent.Resources {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: name,
Total: int64(total.Resources[name]),
Used: int64(used.Resources[name]),
Usage: fmt.Sprintf("%d", int64(value)) + "%",
}
utils = append(utils, utilization)
}
} else if !getResource {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: "N/A",
Total: int64(-1),
Used: int64(-1),
Usage: "N/A",
}
utils = append(utils, utilization)
}
return utils
}
func getAllocationDAO(alloc *objects.Allocation) *dao.AllocationDAOInfo {
var requestTime int64
if alloc.IsPlaceholderUsed() {
requestTime = alloc.GetPlaceholderCreateTime().UnixNano()
} else {
requestTime = alloc.GetAsk().GetCreateTime().UnixNano()
}
allocTime := alloc.GetCreateTime().UnixNano()
allocDAO := &dao.AllocationDAOInfo{
AllocationKey: alloc.GetAllocationKey(),
AllocationTags: alloc.GetTagsClone(),
RequestTime: requestTime,
AllocationTime: allocTime,
AllocationDelay: allocTime - requestTime,
UUID: alloc.GetUUID(),
ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(),
PlaceholderUsed: alloc.IsPlaceholderUsed(),
Placeholder: alloc.IsPlaceholder(),
TaskGroupName: alloc.GetTaskGroup(),
Priority: strconv.Itoa(int(alloc.GetPriority())),
NodeID: alloc.GetNodeID(),
ApplicationID: alloc.GetApplicationID(),
Partition: alloc.GetPartitionName(),
Preempted: alloc.IsPreempted(),
}
return allocDAO
}
func getAllocationsDAO(allocations []*objects.Allocation) []*dao.AllocationDAOInfo {
allocsDAO := make([]*dao.AllocationDAOInfo, 0, len(allocations))
for _, alloc := range allocations {
allocsDAO = append(allocsDAO, getAllocationDAO(alloc))
}
return allocsDAO
}
func getPlaceholderDAO(ph *objects.PlaceholderData) *dao.PlaceholderDAOInfo {
phDAO := &dao.PlaceholderDAOInfo{
TaskGroupName: ph.TaskGroupName,
Count: ph.Count,
MinResource: ph.MinResource.DAOMap(),
Replaced: ph.Replaced,
TimedOut: ph.TimedOut,
}
return phDAO
}
func getPlaceholdersDAO(entries []*objects.PlaceholderData) []*dao.PlaceholderDAOInfo {
phsDAO := make([]*dao.PlaceholderDAOInfo, 0, len(entries))
for _, entry := range entries {
phsDAO = append(phsDAO, getPlaceholderDAO(entry))
}
return phsDAO
}
func getStateDAO(entry *objects.StateLogEntry) *dao.StateDAOInfo {
state := &dao.StateDAOInfo{
Time: entry.Time.UnixNano(),
ApplicationState: entry.ApplicationState,
}
return state
}
func getStatesDAO(entries []*objects.StateLogEntry) []*dao.StateDAOInfo {
statesDAO := make([]*dao.StateDAOInfo, 0, len(entries))
for _, entry := range entries {
statesDAO = append(statesDAO, getStateDAO(entry))
}
return statesDAO
}
func getApplicationDAO(app *objects.Application) *dao.ApplicationDAOInfo {
if app == nil {
return &dao.ApplicationDAOInfo{}
}
return &dao.ApplicationDAOInfo{
ApplicationID: app.ApplicationID,
UsedResource: app.GetAllocatedResource().DAOMap(),
MaxUsedResource: app.GetMaxAllocatedResource().DAOMap(),
PendingResource: app.GetPendingResource().DAOMap(),
Partition: common.GetPartitionNameWithoutClusterID(app.Partition),
QueueName: app.GetQueuePath(),
SubmissionTime: app.SubmissionTime.UnixNano(),
FinishedTime: common.ZeroTimeInUnixNano(app.FinishedTime()),
Requests: getAllocationAsksDAO(app.GetAllRequests()),
Allocations: getAllocationsDAO(app.GetAllAllocations()),
State: app.CurrentState(),
User: app.GetUser().User,
Groups: app.GetUser().Groups,
RejectedMessage: app.GetRejectedMessage(),
PlaceholderData: getPlaceholdersDAO(app.GetAllPlaceholderData()),
StateLog: getStatesDAO(app.GetStateLog()),
HasReserved: app.HasReserved(),
Reservations: app.GetReservations(),
MaxRequestPriority: app.GetAskMaxPriority(),
}
}
func getAllocationLogsDAO(logEntries []*objects.AllocationLogEntry) []*dao.AllocationAskLogDAOInfo {
logsDAO := make([]*dao.AllocationAskLogDAOInfo, len(logEntries))
sort.SliceStable(logEntries, func(i, j int) bool {
return logEntries[i].LastOccurrence.Before(logEntries[j].LastOccurrence)
})
for i, entry := range logEntries {
logsDAO[i] = &dao.AllocationAskLogDAOInfo{
Message: entry.Message,
LastOccurrence: entry.LastOccurrence.UnixNano(),
Count: entry.Count,
}
}
return logsDAO
}
func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo {
return &dao.AllocationAskDAOInfo{
AllocationKey: ask.GetAllocationKey(),
AllocationTags: ask.GetTagsClone(),
RequestTime: ask.GetCreateTime().UnixNano(),
ResourcePerAlloc: ask.GetAllocatedResource().DAOMap(),
PendingCount: ask.GetPendingAskRepeat(),
Priority: strconv.Itoa(int(ask.GetPriority())),
RequiredNodeID: ask.GetRequiredNode(),
ApplicationID: ask.GetApplicationID(),
Partition: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
Placeholder: ask.IsPlaceholder(),
PlaceholderTimeout: ask.GetTimeout().Nanoseconds(),
TaskGroupName: ask.GetTaskGroup(),
AllocationLog: getAllocationLogsDAO(ask.GetAllocationLog()),
TriggeredPreemption: ask.HasTriggeredPreemption(),
Originator: ask.IsOriginator(),
}
}
func getAllocationAsksDAO(asks []*objects.AllocationAsk) []*dao.AllocationAskDAOInfo {
asksDAO := make([]*dao.AllocationAskDAOInfo, 0, len(asks))
for _, ask := range asks {
if ask.GetPendingAskRepeat() > 0 {
asksDAO = append(asksDAO, getAllocationAskDAO(ask))
}
}
return asksDAO
}
func getNodeDAO(node *objects.Node) *dao.NodeDAOInfo {
return &dao.NodeDAOInfo{
NodeID: node.NodeID,
HostName: node.Hostname,
RackName: node.Rackname,
Attributes: node.GetAttributes(),
Capacity: node.GetCapacity().DAOMap(),
Occupied: node.GetOccupiedResource().DAOMap(),
Allocated: node.GetAllocatedResource().DAOMap(),
Available: node.GetAvailableResource().DAOMap(),
Utilized: node.GetUtilizedResource().DAOMap(),
Allocations: getAllocationsDAO(node.GetAllAllocations()),
Schedulable: node.IsSchedulable(),
IsReserved: node.IsReserved(),
Reservations: node.GetReservationKeys(),
}
}
func getNodesDAO(entries []*objects.Node) []*dao.NodeDAOInfo {
nodesDAO := make([]*dao.NodeDAOInfo, 0, len(entries))
for _, entry := range entries {
nodesDAO = append(nodesDAO, getNodeDAO(entry))
}
return nodesDAO
}
func getNodesUtilJSON(partition *scheduler.PartitionContext, name string) *dao.NodesUtilDAOInfo {
mapResult := make([]int, 10)
mapName := make([][]string, 10)
var v float64
var nodeUtil []*dao.NodeUtilDAOInfo
for _, node := range partition.GetNodes() {
resourceExist := true
// check resource exist or not
total := node.GetCapacity()
if total.Resources[name] <= 0 {
resourceExist = false
}
resourceAllocated := node.GetAllocatedResource()
if _, ok := resourceAllocated.Resources[name]; !ok {
resourceExist = false
}
// if resource exist in node, record the bucket it should go
if resourceExist {
v = float64(resources.CalculateAbsUsedCapacity(total, resourceAllocated).Resources[name])
idx := int(math.Dim(math.Ceil(v/10), 1))
mapResult[idx]++
mapName[idx] = append(mapName[idx], node.NodeID)
}
}
// put number of nodes and node name to different buckets
for k := 0; k < 10; k++ {
util := &dao.NodeUtilDAOInfo{
BucketName: fmt.Sprintf("%d", k*10) + "-" + fmt.Sprintf("%d", (k+1)*10) + "%",
NumOfNodes: int64(mapResult[k]),
NodeNames: mapName[k],
}
nodeUtil = append(nodeUtil, util)
}
return &dao.NodesUtilDAOInfo{
ResourceType: name,
NodesUtil: nodeUtil,
}
}
func getApplicationHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusNotImplemented)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getAppHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getContainerHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusNotImplemented)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getContainerHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterConfig(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
conf := configs.ConfigContext.Get(schedulerContext.GetPolicyGroup())
var marshalledConf []byte
var err error
// check if we have a request for json output
if r.Header.Get("Accept") == "application/json" {
marshalledConf, err = json.Marshal(&conf)
} else {
w.Header().Set("Content-Type", "application/x-yaml; charset=UTF-8")
marshalledConf, err = yaml.Marshal(&conf)
}
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
if _, err = w.Write(marshalledConf); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func checkHealthStatus(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// Fetch last healthCheck result
result := schedulerContext.GetLastHealthCheckResult()
if result != nil {
if !result.Healthy {
log.Log(log.SchedHealth).Error("Scheduler is not healthy", zap.Any("health check info", *result))
buildJSONErrorResponse(w, "Scheduler is not healthy", http.StatusServiceUnavailable)
} else {
log.Log(log.SchedHealth).Info("Scheduler is healthy", zap.Any("health check info", *result))
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
} else {
log.Log(log.SchedHealth).Info("The healthy status of scheduler is not found", zap.Any("health check info", ""))
buildJSONErrorResponse(w, "The healthy status of scheduler is not found", http.StatusNotFound)
}
}
func buildUpdateResponse(err error, w http.ResponseWriter) {
if err == nil {
w.WriteHeader(http.StatusOK)
if _, err = w.Write([]byte("Configuration updated successfully")); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
log.Log(log.REST).Info("Configuration update failed with errors",
zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusConflict)
}
}
func getPartitions(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
partitionsInfo := getPartitionInfoDAO(lists)
if err := json.NewEncoder(w).Encode(partitionsInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionQueues(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partitionName := vars.ByName("partition")
var partitionQueuesDAOInfo dao.PartitionQueueDAOInfo
var partition = schedulerContext.GetPartitionWithoutClusterID(partitionName)
if partition != nil {
partitionQueuesDAOInfo = partition.GetPartitionQueues()
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(partitionQueuesDAOInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionNodes(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodesDao := getNodesDAO(partitionContext.GetNodes())
if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
}
}
func getPartitionNode(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodeID := vars.ByName("node")
node := partitionContext.GetNode(nodeID)
if node == nil {
buildJSONErrorResponse(w, NodeDoesNotExists, http.StatusBadRequest)
return
}
nodeDao := getNodeDAO(node)
if err := json.NewEncoder(w).Encode(nodeDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
}
}
func getQueueApplications(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
queueName := vars.ByName("queue")
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusBadRequest)
return
}
appsDao := make([]*dao.ApplicationDAOInfo, 0)
for _, app := range queue.GetCopyOfApps() {
appsDao = append(appsDao, getApplicationDAO(app))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getApplication(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
queueName := vars.ByName("queue")
application := vars.ByName("application")
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusBadRequest)
return
}
app := queue.GetApplication(application)
if app == nil {
buildJSONErrorResponse(w, ApplicationDoesNotExists, http.StatusBadRequest)
return
}
appDao := getApplicationDAO(app)
if err := json.NewEncoder(w).Encode(appDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func setLogLevel(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
log.Log(log.Deprecation).Warn("Setting log levels via the REST API is deprecated. The /ws/v1/loglevel endpoint will be removed in a future release.")
}
func getLogLevel(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
log.Log(log.Deprecation).Warn("Getting log levels via the REST API is deprecated. The /ws/v1/loglevel endpoint will be removed in a future release.")
if _, err := w.Write([]byte("info")); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
var result []*dao.PartitionInfo
for _, partitionContext := range lists {
partitionInfo := &dao.PartitionInfo{}
partitionInfo.ClusterID = partitionContext.RmID
partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
partitionInfo.State = partitionContext.GetCurrentState()
partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().UnixNano()
capacityInfo := dao.PartitionCapacity{}
capacity := partitionContext.GetTotalPartitionResource()
usedCapacity := partitionContext.GetAllocatedResource()
capacityInfo.Capacity = capacity.DAOMap()
capacityInfo.UsedCapacity = usedCapacity.DAOMap()
capacityInfo.Utilization = resources.CalculateAbsUsedCapacity(capacity, usedCapacity).DAOMap()
partitionInfo.Capacity = capacityInfo
partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
Type: partitionContext.GetNodeSortingPolicyType().String(),
ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
}
partitionInfo.TotalNodes = partitionContext.GetTotalNodeCount()
appList := partitionContext.GetApplications()
appList = append(appList, partitionContext.GetCompletedApplications()...)
appList = append(appList, partitionContext.GetRejectedApplications()...)
applicationsState := make(map[string]int)
totalApplications := 0
for _, app := range appList {
applicationsState[app.CurrentState()]++
totalApplications++
}
applicationsState["total"] = totalApplications
partitionInfo.Applications = applicationsState
partitionInfo.TotalContainers = partitionContext.GetTotalAllocationCount()
result = append(result, partitionInfo)
}
return result
}
func getAppHistoryDAO(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
var result []*dao.ApplicationHistoryDAOInfo
for _, record := range records {
if record == nil {
continue
}
element := &dao.ApplicationHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalApplications: strconv.Itoa(record.TotalApplications),
}
result = append(result, element)
}
return result
}
func getPartitionNodesDAO(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
var result []*dao.NodesDAOInfo
for _, partition := range lists {
nodesDao := getNodesDAO(partition.GetNodes())
result = append(result, &dao.NodesDAOInfo{
PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
Nodes: nodesDao,
})
}
return result
}
func getContainerHistoryDAO(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
var result []*dao.ContainerHistoryDAOInfo
for _, record := range records {
if record == nil {
continue
}
element := &dao.ContainerHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalContainers: strconv.Itoa(record.TotalContainers),
}
result = append(result, element)
}
return result
}
func getApplicationsDAO(lists map[string]*scheduler.PartitionContext) []*dao.ApplicationDAOInfo {
var result []*dao.ApplicationDAOInfo
for _, partition := range lists {
var appList []*objects.Application
appList = append(appList, partition.GetApplications()...)
appList = append(appList, partition.GetCompletedApplications()...)
appList = append(appList, partition.GetRejectedApplications()...)
for _, app := range appList {
result = append(result, getApplicationDAO(app))
}
}
return result
}
func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.PartitionQueueDAOInfo {
var result []dao.PartitionQueueDAOInfo
for _, partition := range lists {
result = append(result, partition.GetPartitionQueues())
}
return result
}
func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {
var result []*dao.ClusterDAOInfo
for _, partition := range lists {
result = append(result, getClusterJSON(partition))
}
return result
}
func getRMBuildInformation(lists map[string]*scheduler.RMInformation) []map[string]string {
var result []map[string]string
for _, rmInfo := range lists {
result = append(result, rmInfo.RMBuildInformation)
}
return result
}
func getResourceManagerDiagnostics() map[string]interface{} {
result := make(map[string]interface{}, 0)
plugin := plugins.GetStateDumpPlugin()
// get state dump from RM
dumpStr, err := plugin.GetStateDump()
if err != nil {
// might be not implemented
log.Log(log.REST).Debug("Unable to get RM state dump", zap.Error(err))
result["Error"] = err.Error()
return result
}
// convert to JSON map
if err = json.Unmarshal([]byte(dumpStr), &result); err != nil {
log.Log(log.REST).Warn("Unable to parse RM state dump", zap.Error(err))
result["Error"] = err.Error()
}
return result
}
func getMetrics(w http.ResponseWriter, r *http.Request) {
metrics2.GetRuntimeMetrics().Collect()
promhttp.Handler().ServeHTTP(w, r)
}
func getUsersResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
userManager := ugm.GetUserManager()
usersResources := userManager.GetUsersResources()
var result []*dao.UserResourceUsageDAOInfo
for _, tracker := range usersResources {
result = append(result, tracker.GetUserResourceUsageDAOInfo())
}
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getUserResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
user := vars.ByName("user")
if user == "" {
buildJSONErrorResponse(w, UserNameMissing, http.StatusBadRequest)
return
}
userTracker := ugm.GetUserManager().GetUserTracker(user)
if userTracker == nil {
buildJSONErrorResponse(w, UserDoesNotExists, http.StatusBadRequest)
return
}
var result = userTracker.GetUserResourceUsageDAOInfo()
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getGroupsResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
userManager := ugm.GetUserManager()
groupsResources := userManager.GetGroupsResources()
var result []*dao.GroupResourceUsageDAOInfo
for _, tracker := range groupsResources {
result = append(result, tracker.GetGroupResourceUsageDAOInfo())
}
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getGroupResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
group := vars.ByName("group")
if group == "" {
buildJSONErrorResponse(w, GroupNameMissing, http.StatusBadRequest)
return
}
groupTracker := ugm.GetUserManager().GetGroupTracker(group)
if groupTracker == nil {
buildJSONErrorResponse(w, GroupDoesNotExists, http.StatusBadRequest)
return
}
var result = groupTracker.GetGroupResourceUsageDAOInfo()
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getEvents(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
eventSystem := events.GetEventSystem()
if eventSystem == nil {
buildJSONErrorResponse(w, "Event system is disabled", http.StatusBadRequest)
return
}
count := uint64(10000)
var start uint64
vars := httprouter.ParamsFromContext(r.Context())
if vars != nil {
if countStr := vars.ByName("count"); countStr != "" {
c, err := strconv.ParseInt(countStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
if c <= 0 {
buildJSONErrorResponse(w, fmt.Sprintf("Illegal number of events: %d", c), http.StatusBadRequest)
return
}
count = uint64(c)
}
if startStr := vars.ByName("start"); startStr != "" {
i, err := strconv.ParseInt(startStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
if i < 0 {
buildJSONErrorResponse(w, fmt.Sprintf("Illegal id: %d", i), http.StatusBadRequest)
return
}
start = uint64(i)
}
}
records, lowestID, highestID := eventSystem.GetEventsFromID(start, count)
eventDao := dao.EventRecordDAO{
LowestID: lowestID,
HighestID: highestID,
EventRecords: records,
}
if err := json.NewEncoder(w).Encode(eventDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}