query/device_manager.go (192 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
import (
"github.com/uber/aresdb/cgoutils"
"sync"
"math"
"strconv"
"time"
"github.com/uber/aresdb/common"
queryCom "github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/utils"
)
const (
mb2bytes = 1 << 20
defaultDeviceUtilization = 1
defaultTimeout = 10
)
// DeviceInfo stores memory information per device
type DeviceInfo struct {
// device id
DeviceID int `json:"deviceID"`
// number of queries being served by device
QueryCount int `json:"queryCount"`
// device capacity.
TotalMemory int `json:"totalMemory"`
// device available capacity.
TotalAvailableMemory int `json:"totalAvailableMemory"`
// total free memory
FreeMemory int `json:"totalFreeMemory"`
// query to memory map
QueryMemoryUsageMap map[*queryCom.AQLQuery]int `json:"-"`
}
// DeviceManager has the following functionalities:
// 1. Keep track of number of queries being served by this device and memory usage info
// 2. Estimate the memory requirement for a given query and determine if a device has enough memory to process a query
// 3. Assign queries to chosen device according to routing strategy specified
type DeviceManager struct {
// lock to sync ops.
*sync.RWMutex `json:"-"`
// device to DeviceInfo map
DeviceInfos []*DeviceInfo `json:"deviceInfos"`
// default DeviceChoosingTimeout for finding a device
Timeout int `json:"timeout"`
// Max available memory, this can be used to early determined whether a query can be satisfied or not.
MaxAvailableMemory int `json:"maxAvailableMemory"`
deviceAvailable *sync.Cond
// device choose strategy
strategy deviceChooseStrategy
}
// NewDeviceManager is used to init a DeviceManager.
func NewDeviceManager(cfg common.QueryConfig) *DeviceManager {
deviceMemoryUtilization := cfg.DeviceMemoryUtilization
if deviceMemoryUtilization <= 0 || deviceMemoryUtilization > 1 {
utils.GetLogger().With("deviceMemoryUtilization", deviceMemoryUtilization).
Error("Invalid deviceMemoryUtilization config, setting to default")
deviceMemoryUtilization = defaultDeviceUtilization
}
timeout := cfg.DeviceChoosingTimeout
if timeout <= 0 {
utils.GetLogger().With("timeout", timeout).
Error("Invalid timeout config, setting to default")
timeout = defaultTimeout
}
// retrieve device counts
deviceCount := cgoutils.GetDeviceCount()
utils.GetLogger().With(
"utilization", deviceMemoryUtilization,
"timeout", timeout).Info("Initialized device manager")
deviceInfos := make([]*DeviceInfo, deviceCount)
maxAvailableMem := 0
for device := 0; device < deviceCount; device++ {
deviceInfos[device] = getDeviceInfo(device, deviceMemoryUtilization)
if deviceInfos[device].TotalAvailableMemory >= maxAvailableMem {
maxAvailableMem = deviceInfos[device].TotalAvailableMemory
}
}
deviceManager := &DeviceManager{
RWMutex: &sync.RWMutex{},
DeviceInfos: deviceInfos,
MaxAvailableMemory: maxAvailableMem,
Timeout: timeout,
}
deviceManager.strategy = leastQueryCountAndMemoryStrategy{
deviceManager: deviceManager,
}
deviceManager.deviceAvailable = sync.NewCond(deviceManager)
// Bootstrap device.
utils.GetLogger().Info("Bootstrapping device")
bootstrapDevice()
utils.GetLogger().Info("Finish bootstrapping device")
return deviceManager
}
// getDeviceInfo returns the DeviceInfo struct for a given deviceID.
func getDeviceInfo(device int, deviceMemoryUtilization float32) *DeviceInfo {
totalGlobalMem := cgoutils.GetDeviceGlobalMemoryInMB(device) * mb2bytes
totalAvailableMem := int(float32(totalGlobalMem) * deviceMemoryUtilization)
deviceInfo := DeviceInfo{
DeviceID: device,
QueryCount: 0,
TotalMemory: totalGlobalMem,
TotalAvailableMemory: totalAvailableMem,
FreeMemory: totalAvailableMem,
QueryMemoryUsageMap: make(map[*queryCom.AQLQuery]int, 0),
}
utils.GetLogger().Infof("DeviceInfo[%d]=%+v\n", device, deviceInfo)
return &deviceInfo
}
// FindDevice finds a device to run a given query. If a device is not found, it will wait until
// the DeviceChoosingTimeout seconds elapse.
func (d *DeviceManager) FindDevice(query *queryCom.AQLQuery, requiredMem int, preferredDevice int, timeout int) int {
if requiredMem > d.MaxAvailableMemory {
utils.GetQueryLogger().With(
"query", query,
"requiredMem", requiredMem,
"preferredDevice", preferredDevice,
"maxAvailableMem", d.MaxAvailableMemory,
).Warn("exceeds max memory")
return -1
}
// no DeviceChoosingTimeout passed by request, using default DeviceChoosingTimeout.
if timeout <= 0 {
timeout = d.Timeout
}
timeoutDuration := time.Duration(timeout) * time.Second
start := utils.Now()
d.Lock()
device := -1
for {
if utils.Now().Sub(start) >= timeoutDuration {
utils.GetQueryLogger().With(
"query", query,
"requiredMem", requiredMem,
"preferredDevice", preferredDevice,
"timeout", timeout,
).Error("DeviceChoosingTimeout when choosing the device for the query")
break
}
device = d.findDevice(query, requiredMem, preferredDevice)
if device >= 0 {
break
}
d.deviceAvailable.Wait()
}
d.Unlock()
utils.GetRootReporter().GetTimer(utils.QueryWaitForMemoryDuration).Record(utils.Now().Sub(start))
return device
}
// findDevice finds a device to run a given query according to certain strategy.If no such device can't
// be found, return -1. Caller needs to hold the write lock.
func (d *DeviceManager) findDevice(query *queryCom.AQLQuery, requiredMem int, preferredDevice int) int {
utils.GetQueryLogger().With(
"query", query,
"requiredMem", requiredMem,
"preferredDevice", preferredDevice,
).Debug("trying to find device for query")
candidateDevice := -1
// try to choose preferredDevice if it meets requirements.
if preferredDevice >= 0 && preferredDevice < len(d.DeviceInfos) &&
d.DeviceInfos[preferredDevice].FreeMemory >= requiredMem {
candidateDevice = preferredDevice
}
// choose candidateDevice if preferredDevice does not meet requirements
if candidateDevice < 0 {
candidateDevice = d.strategy.chooseDevice(requiredMem)
}
if candidateDevice < 0 {
return candidateDevice
}
// reserve memory for this query.
deviceInfo := d.DeviceInfos[candidateDevice]
deviceInfo.QueryCount++
deviceInfo.QueryMemoryUsageMap[query] = requiredMem
deviceInfo.FreeMemory -= requiredMem
deviceInfo.reportMemoryUsage()
utils.GetLogger().Debugf("Assign device '%d' for query", candidateDevice)
utils.GetLogger().Debugf("DeviceInfo=%+v", deviceInfo)
return candidateDevice
}
// ReleaseReservedMemory adjust total free global memory for a given device after a query is complete
func (d *DeviceManager) ReleaseReservedMemory(device int, query *queryCom.AQLQuery) {
// Don't even need the lock,
if device < 0 || device >= len(d.DeviceInfos) {
return
}
d.Lock()
defer d.Unlock()
deviceInfo := d.DeviceInfos[device]
usage, ok := deviceInfo.QueryMemoryUsageMap[query]
if ok {
utils.GetLogger().Debugf("Freed %d bytes memory on device %d", usage, device)
deviceInfo.FreeMemory += usage
deviceInfo.reportMemoryUsage()
delete(deviceInfo.QueryMemoryUsageMap, query)
deviceInfo.QueryCount--
d.deviceAvailable.Broadcast()
}
}
// reportMemoryUsage reports the memory usage of specified device. Caller needs to hold the lock.
func (deviceInfo *DeviceInfo) reportMemoryUsage() {
utils.GetRootReporter().GetChildGauge(map[string]string{
"device": strconv.Itoa(deviceInfo.DeviceID),
}, utils.EstimatedDeviceMemory).Update(
float64(deviceInfo.TotalAvailableMemory - deviceInfo.FreeMemory))
}
// deviceChooseStrategy defines the interface to choose an available device for
// specific query.
type deviceChooseStrategy interface {
chooseDevice(requiredMem int) int
}
// leastAvailableMemoryStrategy is to pick up device with least query count and
// least memory that's larger than required memory of the query.
type leastQueryCountAndMemoryStrategy struct {
deviceManager *DeviceManager
}
// chooseDevice finds a device to run a given query according to certain strategy
// If no such device, return -1.
func (s leastQueryCountAndMemoryStrategy) chooseDevice(requiredMem int) int {
candidateDevice := -1
leastMemory := int(math.MaxInt64)
leastQueryCount := int(math.MaxInt32)
for device, deviceInfo := range s.deviceManager.DeviceInfos {
if deviceInfo.FreeMemory >= requiredMem && (deviceInfo.QueryCount < leastQueryCount ||
(deviceInfo.QueryCount == leastQueryCount && deviceInfo.FreeMemory <= leastMemory)) {
candidateDevice = device
leastQueryCount = deviceInfo.QueryCount
leastMemory = deviceInfo.FreeMemory
}
}
return candidateDevice
}