pkg/aurorabridge/config.go (112 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aurorabridge
import (
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/respool"
"github.com/uber/peloton/pkg/common/config"
)
// ServiceHandlerConfig defines ServiceHandler configuration.
type ServiceHandlerConfig struct {
GetJobUpdateWorkers int `yaml:"get_job_update_workers"`
GetJobsWorkers int `yaml:"get_jobs_workers"`
GetJobSummaryWorkers int `yaml:"get_job_summary_workers"`
StopPodWorkers int `yaml:"stop_pod_workers"`
CreateJobSpecForUpdateWorkers int `yaml:"create_job_spec_for_update_workers"`
// Config for number of workers for getTasksWithoutConfigs endpoint.
GetTasksWithoutConfigsWorkers int `yaml:"get_tasks_without_configs_workers"`
GetTasksWithoutConfigsMediumWorkers int `yaml:"get_tasks_without_configs_medium_workers"`
GetTasksWithoutConfigsMediumThreshold int `yaml:"get_tasks_without_configs_medium_threshold"`
GetTasksWithoutConfigsLargeWorkers int `yaml:"get_tasks_without_configs_large_workers"`
GetTasksWithoutConfigsLargeThreshold int `yaml:"get_tasks_without_configs_large_threshold"`
// getTasksWithoutConfigs task querying depth. It limits the number
// of pods to be included in the return result - return pods from
// current run if set to 1, return pods from current and previous one
// run if set to 2, etc.
// In Aurora, getTasksWithoutConfigs will return all the current and
// completed tasks stored in the DB. However, in Peloton, since we
// keep the complete history of pods, this parameter is used to limit
// the number of pods returned.
PodRunsDepth int `yaml:"pod_runs_depth"`
// Maximum number of pods that will get returned, while meeting
// minPodRunsDepth requirement.
GetTasksPodMax int `yaml:"get_tasks_pod_max"`
// QueryJobsLimit specifies Limit parameter passed to QueryJobs request
QueryJobsLimit uint32 `yaml:"query_jobs_limit"`
// InstanceEventsLimit specifies the limit on number of events per instance
InstanceEventsLimit uint32 `yaml:"instance_events_limit"`
// UpdatesLimit specifies the limit on number of updates to include per job
UpdatesLimit uint32 `yaml:"updates_limit"`
// ThemrosExecutor is config used to generate mesos CommandInfo / ExecutorInfo
// for Thermos executor
ThermosExecutor config.ThermosExecutorConfig `yaml:"thermos_executor"`
// Enable Peloton inplace update
EnableInPlace bool `yaml:"enable-inplace-update"`
}
func (c *ServiceHandlerConfig) normalize() {
if c.GetJobUpdateWorkers == 0 {
c.GetJobUpdateWorkers = 25
}
if c.GetJobsWorkers == 0 {
c.GetJobsWorkers = 25
}
if c.GetJobSummaryWorkers == 0 {
c.GetJobSummaryWorkers = 25
}
if c.GetTasksWithoutConfigsWorkers == 0 {
c.GetTasksWithoutConfigsWorkers = 100
}
if c.GetTasksWithoutConfigsMediumWorkers == 0 {
c.GetTasksWithoutConfigsMediumWorkers = 250
}
if c.GetTasksWithoutConfigsMediumThreshold == 0 {
c.GetTasksWithoutConfigsMediumThreshold = 500
}
if c.GetTasksWithoutConfigsLargeWorkers == 0 {
c.GetTasksWithoutConfigsLargeWorkers = 500
}
if c.GetTasksWithoutConfigsLargeThreshold == 0 {
c.GetTasksWithoutConfigsLargeThreshold = 1000
}
if c.StopPodWorkers == 0 {
c.StopPodWorkers = 25
}
if c.CreateJobSpecForUpdateWorkers == 0 {
c.CreateJobSpecForUpdateWorkers = 25
}
if c.PodRunsDepth <= 0 {
c.PodRunsDepth = 1
}
if c.GetTasksPodMax == 0 {
c.GetTasksPodMax = 1000
}
if c.QueryJobsLimit == 0 {
c.QueryJobsLimit = 1000
}
if c.InstanceEventsLimit == 0 {
c.InstanceEventsLimit = 100
}
if c.UpdatesLimit == 0 {
c.UpdatesLimit = 10
}
}
func (c *ServiceHandlerConfig) getTasksWithoutConfigsWorkers(size int) int {
workers := c.GetTasksWithoutConfigsWorkers
if size >= c.GetTasksWithoutConfigsLargeThreshold {
workers = c.GetTasksWithoutConfigsLargeWorkers
} else if size >= c.GetTasksWithoutConfigsMediumThreshold {
workers = c.GetTasksWithoutConfigsMediumWorkers
}
return workers
}
func (c *ServiceHandlerConfig) validate() error {
if err := c.ThermosExecutor.Validate(); err != nil {
return err
}
return nil
}
// RespoolLoaderConfig defines RespoolLoader configuration.
type RespoolLoaderConfig struct {
RetryInterval time.Duration `yaml:"retry_interval"`
RespoolPath string `yaml:"respool_path"`
GPURespoolPath string `yaml:"gpu_respool_path"`
DefaultRespoolSpec DefaultRespoolSpec `yaml:"default_respool_spec"`
}
// DefaultRespoolSpec defines parameters used to create a default respool for
// bridge when boostrapping a new cluster.
type DefaultRespoolSpec struct {
OwningTeam string `yaml:"owning_team"`
LDAPGroups []string `yaml:"ldap_groups"`
Description string `yaml:"description"`
Resources []*respool.ResourceConfig `yaml:"resources"`
Policy respool.SchedulingPolicy `yaml:"policy"`
ControllerLimit *respool.ControllerLimit `yaml:"controller_limit"`
SlackLimit *respool.SlackLimit `yaml:"slack_limit"`
}
func (c *RespoolLoaderConfig) normalize() {
if c.RetryInterval == 0 {
c.RetryInterval = 5 * time.Second
}
}
// EventPublisherConfig represents config for publishing task state change
// events to kafks
type EventPublisherConfig struct {
// KafkaURL represents the stream on which task state changes to publish
KafkaURL string `yaml:"kafka_url"`
// PublishEvents defines whether to publish task state changes to kafka
PublishEvents bool `yaml:"publish_events"`
// GRPCMsgSize defines the max payload size that can be send and recv
GRPCMsgSize int `yaml:"grpc_msg_size"`
}