pkg/archiver/engine/engine.go (357 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"context"
"fmt"
"math/rand"
nethttp "net/http"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/query"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/archiver/config"
auth_impl "github.com/uber/peloton/pkg/auth/impl"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/backoff"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/middleware/inbound"
"github.com/uber/peloton/pkg/middleware/outbound"
"github.com/golang/protobuf/ptypes"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"
)
const (
// Delay between consecutive delete API requests, value chosen at random.
delayDelete = 100 * time.Millisecond
// Good practice to add some jitter to archive intervals,
// in case we add more archiver instances in future.
// Keep default max jitter to 100ms
jitterMax = 100
// The string "completed_job" will be used to tag the logs that contain
// job summary. This will be used by logstash and streamed using a heatpipe
// kafka topic to Hive table
completedJobTag = "completed_job"
// The key "filebeat_topic" will be used by filebeat to stream completed
// jobs to kafka topic specified
filebeatTopic = "filebeat_topic"
// archiver summary map keys
archiverSuccessKey = "SUCCESS"
archiverFailureKey = "FAILURE"
// Number of pod events run to persist in DB.
_defaultPodEventsToConstraint = uint64(100)
)
// Engine defines the interface used to query a peloton component
// for data and then archive that data to secondary storage using
// message queue
type Engine interface {
// Start starts the archiver goroutines
Start() error
// Cleanup cleans the archiver engine before
// restart
Cleanup()
}
type engine struct {
// Jobmgr client to make Job Query/Delete API requests
jobClient job.JobManagerYARPCClient
// Task Manager Client to query task events.
taskClient task.TaskManagerYARPCClient
// Yarpc dispatcher
dispatcher *yarpc.Dispatcher
// Archiver config
config config.Config
// Archiver metrics
metrics *Metrics
// Archiver backoff/retry policy
retryPolicy backoff.RetryPolicy
}
// New creates a new Archiver Engine.
func New(
cfg config.Config,
scope tally.Scope,
mux *nethttp.ServeMux,
discovery leader.Discovery,
inbounds []transport.Inbound) (Engine, error) {
cfg.Archiver.Normalize()
jobmgrURL, err := discovery.GetAppURL(common.JobManagerRole)
if err != nil {
return nil, err
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
return nil, err
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
return nil, err
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
t := grpc.NewTransport()
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: config.PelotonArchiver,
Inbounds: inbounds,
Outbounds: yarpc.Outbounds{
common.PelotonJobManager: transport.Outbounds{
Unary: t.NewSingleOutbound(jobmgrURL.Host),
},
},
Metrics: yarpc.MetricsConfig{
Tally: scope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Oneway: authInboundMiddleware,
Unary: authInboundMiddleware,
Stream: authInboundMiddleware,
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Oneway: authOutboundMiddleware,
Unary: authOutboundMiddleware,
Stream: authOutboundMiddleware,
},
})
if err := dispatcher.Start(); err != nil {
return nil, fmt.Errorf("Unable to start dispatcher: %v", err)
}
return &engine{
jobClient: job.NewJobManagerYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
taskClient: task.NewTaskManagerYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
dispatcher: dispatcher,
config: cfg,
metrics: NewMetrics(scope),
retryPolicy: backoff.NewRetryPolicy(
cfg.Archiver.MaxRetryAttemptsJobQuery,
cfg.Archiver.RetryIntervalJobQuery),
}, nil
}
// Start starts archiver with actions such as
// 1) archive terminal batch jobs
// 2) constraint pod events for RUNNING stateless jobs.
// Actions are iterated sequentially to minimize the load on
// Cassandra cluster to not impact real-time workload.
func (e *engine) Start() error {
// Account for time taken for jobmgr to start and finish recovery
// This is more of a precaution so that archiver does not mess
// around with core jobmgr functionality.
// TODO: remove this delay once we move to API server
e.metrics.ArchiverStart.Inc(1)
jitter := time.Duration(rand.Intn(jitterMax)) * time.Millisecond
time.Sleep(e.config.Archiver.BootstrapDelay + jitter)
// At first, the time range will be [(t-30d-1d), (t-30d))
maxTime := time.Now().UTC().Add(-e.config.Archiver.ArchiveAge)
minTime := maxTime.Add(-e.config.Archiver.ArchiveStepSize)
for {
if e.config.Archiver.Enable {
startTime := time.Now()
max, err := ptypes.TimestampProto(maxTime)
if err != nil {
return err
}
min, err := ptypes.TimestampProto(minTime)
if err != nil {
return err
}
spec := job.QuerySpec{
JobStates: []job.JobState{
job.JobState_SUCCEEDED,
job.JobState_FAILED,
job.JobState_KILLED,
},
CompletionTimeRange: &peloton.TimeRange{Min: min, Max: max},
Pagination: &query.PaginationSpec{
Offset: 0,
Limit: uint32(e.config.Archiver.MaxArchiveEntries),
MaxLimit: uint32(e.config.Archiver.MaxArchiveEntries),
},
}
if err := e.runArchiver(
&job.QueryRequest{
Spec: &spec,
SummaryOnly: true,
},
e.archiveJobs); err != nil {
return err
}
e.metrics.ArchiverRun.Inc(1)
e.metrics.ArchiverRunDuration.Record(time.Since(startTime))
maxTime = minTime
minTime = minTime.Add(-e.config.Archiver.ArchiveStepSize)
}
if e.config.Archiver.PodEventsCleanup {
startTime := time.Now()
spec := job.QuerySpec{
JobStates: []job.JobState{
job.JobState_RUNNING,
},
Pagination: &query.PaginationSpec{
Offset: 0,
Limit: uint32(e.config.Archiver.MaxArchiveEntries),
MaxLimit: uint32(e.config.Archiver.MaxArchiveEntries),
},
}
if err := e.runArchiver(
&job.QueryRequest{
Spec: &spec,
SummaryOnly: true,
},
e.deletePodEvents); err != nil {
return err
}
e.metrics.PodDeleteEventsRun.Inc(1)
e.metrics.PodDeleteEventsRunDuration.Record(time.Since(startTime))
}
jitter := time.Duration(rand.Intn(jitterMax)) * time.Millisecond
time.Sleep(e.config.Archiver.ArchiveInterval + jitter)
}
}
// Cleanup cleans the archiver engine before restarting
func (e *engine) Cleanup() {
e.dispatcher.Stop()
return
}
// runArchiver runs the action(s) for Archiver
func (e *engine) runArchiver(
queryReq *job.QueryRequest,
action func(
ctx context.Context,
results []*job.JobSummary)) error {
p := backoff.NewRetrier(e.retryPolicy)
queryResp, err := e.queryJobs(
context.Background(),
queryReq,
p)
if err != nil {
return err
}
results := queryResp.GetResults()
action(
context.Background(),
results)
return nil
}
// archiveJobs archives only batch jobs.
func (e *engine) archiveJobs(
ctx context.Context,
results []*job.JobSummary) {
if len(results) > 0 {
archiveSummary := map[string]int{archiverFailureKey: 0, archiverSuccessKey: 0}
for _, summary := range results {
if summary.GetType() != job.JobType_BATCH {
continue
}
// Sleep between consecutive Job Delete requests
time.Sleep(delayDelete)
// The log event for completedJobTag will be logged to Archiver stdout
// Filebeat configured on the Peloton host will ship this log out to
// logstash. Logstash will be configured to stream this specific log
// event to Hive via a heatpipe topic.
log.WithFields(log.Fields{
filebeatTopic: e.config.Archiver.KafkaTopic,
completedJobTag: summary,
}).Info("completed job")
if e.config.Archiver.StreamOnlyMode {
continue
}
log.WithFields(log.Fields{
"job_id": summary.GetId().GetValue(),
"state": summary.GetRuntime().GetState()}).
Info("Deleting job")
deleteReq := &job.DeleteRequest{
Id: summary.GetId(),
}
ctx, cancel := context.WithTimeout(
ctx,
e.config.Archiver.PelotonClientTimeout,
)
defer cancel()
_, err := e.jobClient.Delete(ctx, deleteReq)
if err != nil {
// TODO: have a reasonable threshold for tolerating such failures
// For now, just continue processing the next job in the list
log.WithError(err).
WithField("job_id", summary.GetId().GetValue()).
Error("job delete failed")
e.metrics.ArchiverJobDeleteFail.Inc(1)
archiveSummary[archiverFailureKey]++
} else {
e.metrics.ArchiverJobDeleteSuccess.Inc(1)
archiveSummary[archiverSuccessKey]++
}
}
succeededCount, _ := archiveSummary[archiverSuccessKey]
failedCount, _ := archiveSummary[archiverFailureKey]
e.metrics.ArchiverJobQuerySuccess.Inc(1)
log.WithFields(log.Fields{
"total": len(results),
"succeeded": succeededCount,
"failed": failedCount,
}).Info("Archive summary")
} else {
log.Debug("No jobs in timerange")
// TODO (adityacb)
// Reset here if there are no more jobs left to be archived.
// This means that for n consecutive attempts if we get no
// results, we should move the archive window back to now - 30days
e.metrics.ArchiverNoJobsInTimerange.Inc(1)
}
}
// deletePodEvents reads RUNNING service jobs and deletes,
// runs (monotonically increasing counter) if more than 100.
// This action is to constraint #runs in DB, to prevent large partitions
// 1) Get the most recent run_id from DB.
// 2) If more than 100 runs exist, delete the delta.
func (e *engine) deletePodEvents(
ctx context.Context,
results []*job.JobSummary) {
var i uint32
for _, jobSummary := range results {
if jobSummary.GetType() != job.JobType_SERVICE {
continue
}
log.WithFields(log.Fields{
"job_id": jobSummary.GetId().GetValue(),
"state": jobSummary.GetRuntime().GetState()}).
Debug("Deleting pod events")
for i = 0; i < jobSummary.GetInstanceCount(); i++ {
ctx, cancel := context.WithTimeout(
ctx,
e.config.Archiver.PelotonClientTimeout,
)
defer cancel()
response, err := e.taskClient.GetPodEvents(
ctx,
&task.GetPodEventsRequest{
JobId: jobSummary.GetId(),
InstanceId: i,
Limit: 1})
if err != nil {
log.WithFields(log.Fields{
"job_id": jobSummary.GetId(),
"instance_id": i,
}).WithError(err).
Error("unable to fetch pod events")
e.metrics.PodDeleteEventsFail.Inc(1)
continue
}
if len(response.GetResult()) == 0 {
continue
}
runID, err := util.ParseRunID(response.GetResult()[0].GetTaskId().GetValue())
if err != nil {
log.WithFields(log.Fields{
"job_id": jobSummary.GetId(),
"instance_id": i,
"run_id": response.GetResult()[0].GetTaskId().GetValue(),
}).WithError(err).
Error("error parsing runID")
e.metrics.PodDeleteEventsFail.Inc(1)
continue
}
if runID > _defaultPodEventsToConstraint {
log.WithFields(log.Fields{
"job_id": jobSummary.GetId(),
"instance_id": i,
"run_id": runID - _defaultPodEventsToConstraint,
"task_id": response.GetResult()[0].GetTaskId().GetValue(),
}).Info("Delete runs")
ctx, cancel := context.WithTimeout(
ctx,
e.config.Archiver.PelotonClientTimeout,
)
defer cancel()
_, err = e.taskClient.DeletePodEvents(
ctx,
&task.DeletePodEventsRequest{
JobId: jobSummary.GetId(),
InstanceId: i,
RunId: runID - _defaultPodEventsToConstraint})
if err != nil {
log.WithFields(log.Fields{
"job_id": jobSummary.GetId(),
"instance_id": i,
"run_id": response.GetResult()[0].GetTaskId().GetValue(),
}).WithError(err).
Error("unable to delete pod events")
e.metrics.PodDeleteEventsFail.Inc(1)
continue
}
}
e.metrics.PodDeleteEventsSuccess.Inc(1)
}
}
}
func (e *engine) queryJobs(
ctx context.Context,
req *job.QueryRequest,
p backoff.Retrier) (*job.QueryResponse, error) {
for {
ctx, cancel := context.WithTimeout(
ctx,
e.config.Archiver.PelotonClientTimeout,
)
resp, err := e.jobClient.Query(ctx, req)
cancel()
if err == nil {
return resp, nil
}
if backoff.CheckRetry(p) {
continue
} else {
return nil, err
}
}
}