pkg/storage/objects/host_infos.go (432 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law orupd agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package objects
import (
"context"
"encoding/json"
"sync"
"time"
hostpb "github.com/uber/peloton/.gen/peloton/api/v0/host"
pelotonpb "github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/pkg/hostmgr/common"
"github.com/uber/peloton/pkg/storage/objects/base"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
// it adds a HostInfoObject instance to the global list of storage objects.
func init() {
Objs = append(Objs, &HostInfoObject{})
}
// HostInfoObject corresponds to a row in host_info table.
type HostInfoObject struct {
// DB specific annotations.
base.Object `cassandra:"name=host_info, primaryKey=((hostname))"`
// Hostname of the host.
Hostname *base.OptionalString `column:"name=hostname"`
// IP address of the host.
IP string `column:"name=ip"`
// HostState of the host.
State string `column:"name=state"`
// GoalState of the host.
GoalState string `column:"name=goal_state"`
// Labels of the host.
Labels string `column:"name=labels"`
// Current host Pool for the host.
// This will indicate which host pool this host belongs to.
CurrentPool string `column:"name=current_pool"`
// Desired host pool for the host
// This will indicate which host pool this host should be.
DesiredPool string `column:"name=desired_pool"`
// Last update time of the host maintenance.
UpdateTime time.Time `column:"name=update_time"`
}
// transform will convert all the value from DB into the corresponding type
// in ORM object to be interpreted by base store client.
func (o *HostInfoObject) transform(row map[string]interface{}) {
o.Hostname = base.NewOptionalString(row["hostname"])
o.IP = row["ip"].(string)
o.State = row["state"].(string)
o.GoalState = row["goal_state"].(string)
o.Labels = row["labels"].(string)
o.CurrentPool = row["current_pool"].(string)
o.DesiredPool = row["desired_pool"].(string)
o.UpdateTime = row["update_time"].(time.Time)
}
// HostInfoOps provides methods for manipulating host_maintenance table.
type HostInfoOps interface {
// Create inserts a row in the table.
Create(
ctx context.Context,
hostname string,
ip string,
state hostpb.HostState,
goalState hostpb.HostState,
labels map[string]string,
currentPool string,
desiredPool string,
) error
// Get retrieves the row based on the primary key from the table.
Get(
ctx context.Context,
hostname string,
) (*hostpb.HostInfo, error)
// GetAll retrieves all rows from the table (with no selection on any key).
GetAll(ctx context.Context) ([]*hostpb.HostInfo, error)
// UpdateState updates the state of an object in the table.
UpdateState(
ctx context.Context,
hostname string,
state hostpb.HostState,
) error
// UpdateGoalState updates the goal state of an object in the table.
UpdateGoalState(
ctx context.Context,
hostname string,
goalState hostpb.HostState,
) error
// UpdateLables updates the labels an object in the table.
UpdateLabels(
ctx context.Context,
hostname string,
labels map[string]string,
) error
// UpdatePool updates the current & desired host pool of an object
// in the table.
UpdatePool(
ctx context.Context,
hostname string,
currentPool string,
desiredPool string,
) error
// UpdateDesiredPool updates the desired host pool of an object in the table.
UpdateDesiredPool(
ctx context.Context,
hostname string,
desiredPool string,
) error
// Delete removes an object from the table based on primary key.
Delete(ctx context.Context, hostname string) error
// CompareAndSet compares and sets the host info fields
CompareAndSet(
ctx context.Context,
hostname string,
hostInfoDiff common.HostInfoDiff,
compareFields common.HostInfoDiff,
) error
}
// hostInfoOps implements HostInfoOps using a particular Store.
// TODO: after merging with host cache, concurrency control should be achieved
// through locking in host cache.
type hostInfoOps struct {
lock sync.RWMutex
store *Store
}
var (
// HostInfoOps singleton object.
// Our approach to prevent concurrent writes to a HostInfo entry, is to
// serialize all HostInfo writes perform all writes to HostInfo table under
// a single lock in HostInfoOps. Hence HostInfoOps is a singleton.
// Do not do this for any other object in orm. This is a temporary fix.
// TODO: Remove singleton once we move to host cache for concurrency control
hInfoOps *hostInfoOps
once sync.Once
curVersion uint32
)
// InitHostInfoOps initializes HostInfoOps singleton
func InitHostInfoOps(s *Store) {
if hInfoOps != nil {
log.Info("HostInfoOps already initialized")
return
}
once.Do(func() {
hInfoOps = &hostInfoOps{store: s}
curVersion = 0
})
}
// GetHostInfoOps returns the HostInfoOps singleton object.
func GetHostInfoOps() HostInfoOps {
if hInfoOps == nil {
log.Fatal("HostInfoOps not initialized")
return nil
}
return hInfoOps
}
// Create creates a host info in db.
func (d *hostInfoOps) Create(
ctx context.Context,
hostname string,
ip string,
state hostpb.HostState,
goalState hostpb.HostState,
labels map[string]string,
currentPool string,
desiredPool string,
) error {
d.lock.Lock()
defer d.lock.Unlock()
bytes, err := json.Marshal(&labels)
if err != nil {
return err
}
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
IP: ip,
State: state.String(),
GoalState: goalState.String(),
Labels: string(bytes),
CurrentPool: currentPool,
DesiredPool: desiredPool,
UpdateTime: time.Now(),
}
if err := d.store.oClient.CreateIfNotExists(ctx, hostInfoObject); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoAddFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoAdd.Inc(1)
return nil
}
// Get gets a host info from db by its hostname pk.
func (d *hostInfoOps) Get(
ctx context.Context,
hostname string,
) (*hostpb.HostInfo, error) {
d.lock.RLock()
defer d.lock.RUnlock()
var row map[string]interface{}
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
}
row, err := d.store.oClient.Get(ctx, hostInfoObject)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoGetFail.Inc(1)
return nil, err
}
if len(row) == 0 {
return nil, yarpcerrors.NotFoundErrorf(
"host info not found %s", hostname)
}
hostInfoObject.transform(row)
info, err := newHostInfoFromHostInfoObject(hostInfoObject)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoGetFail.Inc(1)
return nil, err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoGet.Inc(1)
return info, nil
}
// GetAll gets all host infos from db without any pk specified.
func (d *hostInfoOps) GetAll(ctx context.Context) ([]*hostpb.HostInfo, error) {
d.lock.RLock()
defer d.lock.RUnlock()
rows, err := d.store.oClient.GetAll(ctx, &HostInfoObject{})
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoGetAllFail.Inc(1)
return nil, err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoGetAll.Inc(1)
var hostInfos []*hostpb.HostInfo
for _, row := range rows {
obj := &HostInfoObject{}
obj.transform(row)
info, err := newHostInfoFromHostInfoObject(obj)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoGetAllFail.Inc(1)
return nil, err
}
hostInfos = append(hostInfos, info)
}
return hostInfos, nil
}
// Update the host state of a host info by its hostname pk
func (d *hostInfoOps) UpdateState(
ctx context.Context,
hostname string,
state hostpb.HostState,
) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
State: state.String(),
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"State", "UpdateTime"}
if err := d.store.oClient.Update(
ctx,
hostInfoObject,
fieldsToUpdate...); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdateFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdate.Inc(1)
return nil
}
// CompareAndSet updates the fields to the specified value only if the
// version matches.
// This is a very heavy handed operation and should not be called at scale.
// The reason it is implemented this way is that this is a very infrequent call.
func (d *hostInfoOps) CompareAndSet(
ctx context.Context,
hostname string,
hostInfoDiff common.HostInfoDiff,
compareFields common.HostInfoDiff,
) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
}
row, err := d.store.oClient.Get(
ctx,
hostInfoObject,
)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return err
}
if len(row) == 0 {
return yarpcerrors.NotFoundErrorf("host info not found")
}
hostInfoObject.transform(row)
info, err := newHostInfoFromHostInfoObject(hostInfoObject)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return err
}
for key, value := range compareFields {
switch key {
case common.StateField:
if info.GetState() != value.(hostpb.HostState) {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return yarpcerrors.AbortedErrorf("State field does not match")
}
case common.GoalStateField:
if info.GetGoalState() != value.(hostpb.HostState) {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return yarpcerrors.AbortedErrorf("GoalState field does not match")
}
case common.CurrentPoolField:
if info.GetCurrentPool() != value.(string) {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return yarpcerrors.AbortedErrorf("CurrentPool field does not match")
}
case common.DesiredPoolField:
if info.GetDesiredPool() != value.(string) {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return yarpcerrors.AbortedErrorf("DesiredPool field does not match")
}
}
}
hostInfoObject = &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"UpdateTime"}
for field, value := range hostInfoDiff {
switch field {
case common.StateField:
hostInfoObject.State = value.(hostpb.HostState).String()
case common.GoalStateField:
hostInfoObject.GoalState = value.(hostpb.HostState).String()
case common.CurrentPoolField:
hostInfoObject.CurrentPool = value.(string)
case common.DesiredPoolField:
hostInfoObject.DesiredPool = value.(string)
}
fieldsToUpdate = append(fieldsToUpdate, field)
}
if err := d.store.oClient.Update(
ctx,
hostInfoObject,
fieldsToUpdate...); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSetFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoCompareAndSet.Inc(1)
return nil
}
// Update the host goal state of a host info by its hostname pk
func (d *hostInfoOps) UpdateGoalState(
ctx context.Context,
hostname string,
goalState hostpb.HostState,
) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
GoalState: goalState.String(),
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"GoalState", "UpdateTime"}
if err := d.store.oClient.Update(
ctx,
hostInfoObject,
fieldsToUpdate...); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdateFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdate.Inc(1)
return nil
}
// Update the labels of a host info by its hostname pk
func (d *hostInfoOps) UpdateLabels(
ctx context.Context,
hostname string,
labels map[string]string,
) error {
d.lock.Lock()
defer d.lock.Unlock()
bytes, err := json.Marshal(&labels)
if err != nil {
return err
}
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
Labels: string(bytes),
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"Labels", "UpdateTime"}
if err := d.store.oClient.Update(
ctx,
hostInfoObject,
fieldsToUpdate...); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdateFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoUpdate.Inc(1)
return nil
}
// Delete deletes a host info from db by its hostname pk.
func (d *hostInfoOps) Delete(ctx context.Context, hostname string) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
}
if err := d.store.oClient.Delete(ctx, hostInfoObject); err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoDeleteFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoDelete.Inc(1)
return nil
}
// UpdateCurrentPool updates current and desired pool on a host.
func (d *hostInfoOps) UpdatePool(
ctx context.Context,
hostname string,
currentPool string,
desiredPool string,
) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
CurrentPool: currentPool,
DesiredPool: desiredPool,
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"CurrentPool", "DesiredPool", "UpdateTime"}
err := d.store.oClient.Update(ctx, hostInfoObject, fieldsToUpdate...)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoCurrentPoolUpdateFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoCurrentPoolUpdate.Inc(1)
return nil
}
// UpdateDesiredPool updates desired pool on a host.
func (d *hostInfoOps) UpdateDesiredPool(
ctx context.Context,
hostname string,
pool string,
) error {
d.lock.Lock()
defer d.lock.Unlock()
hostInfoObject := &HostInfoObject{
Hostname: base.NewOptionalString(hostname),
DesiredPool: pool,
UpdateTime: time.Now(),
}
fieldsToUpdate := []string{"DesiredPool", "UpdateTime"}
err := d.store.oClient.Update(ctx, hostInfoObject, fieldsToUpdate...)
if err != nil {
d.store.metrics.OrmHostInfoMetrics.HostInfoDesiredPoolUpdateFail.Inc(1)
return err
}
d.store.metrics.OrmHostInfoMetrics.HostInfoDesiredPoolUpdate.Inc(1)
return nil
}
// newHostInfoFromHostInfoObject creates a new *hostpb.HostInfo
// and sets each field from a HostInfoObject object.
func newHostInfoFromHostInfoObject(
hostInfoObject *HostInfoObject) (*hostpb.HostInfo, error) {
hostInfo := &hostpb.HostInfo{}
hostInfo.Hostname = hostInfoObject.Hostname.String()
hostInfo.Ip = hostInfoObject.IP
hostInfo.State = hostpb.HostState(
hostpb.HostState_value[hostInfoObject.State])
hostInfo.GoalState = hostpb.HostState(
hostpb.HostState_value[hostInfoObject.GoalState])
if hostInfoObject.Labels != "" {
labels := make(map[string]string)
err := json.Unmarshal([]byte(hostInfoObject.Labels), &labels)
if err != nil {
return nil, err
}
for l, v := range labels {
hostInfo.Labels = append(
hostInfo.Labels,
&pelotonpb.Label{Key: l, Value: v},
)
}
}
hostInfo.CurrentPool = hostInfoObject.CurrentPool
hostInfo.DesiredPool = hostInfoObject.DesiredPool
return hostInfo, nil
}