profiling.go (118 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apm // import "go.elastic.co/apm/v2"
import (
"bytes"
"context"
"io"
"runtime/pprof"
"time"
"github.com/pkg/errors"
)
type profilingState struct {
profileType string
profileStart func(io.Writer) error
profileStop func()
sender profileSender
interval time.Duration
duration time.Duration // not relevant to all profiles
timer *time.Timer
timerStart time.Time
buf bytes.Buffer
finished chan struct{}
}
// newCPUProfilingState calls newProfilingState with the
// profiler type set to "cpu", and using pprof.StartCPUProfile
// and pprof.StopCPUProfile.
func newCPUProfilingState(sender profileSender) *profilingState {
return newProfilingState("cpu", pprof.StartCPUProfile, pprof.StopCPUProfile, sender)
}
// newHeapProfilingState calls newProfilingState with the
// profiler type set to "heap", and using pprof.Lookup("heap").WriteTo(writer, 0).
func newHeapProfilingState(sender profileSender) *profilingState {
return newLookupProfilingState("heap", sender)
}
func newLookupProfilingState(name string, sender profileSender) *profilingState {
profileStart := func(w io.Writer) error {
profile := pprof.Lookup(name)
if profile == nil {
return errors.Errorf("no profile called %q", name)
}
return profile.WriteTo(w, 0)
}
return newProfilingState("heap", profileStart, func() {}, sender)
}
// newProfilingState returns a new profilingState,
// with its timer stopped. The timer may be started
// by calling profilingState.updateConfig.
func newProfilingState(
profileType string,
profileStart func(io.Writer) error,
profileStop func(),
sender profileSender,
) *profilingState {
state := &profilingState{
profileType: profileType,
profileStart: profileStart,
profileStop: profileStop,
sender: sender,
timer: time.NewTimer(0),
finished: make(chan struct{}, 1),
}
if !state.timer.Stop() {
<-state.timer.C
}
return state
}
func (state *profilingState) updateConfig(interval, duration time.Duration) {
if state.sender == nil {
// No profile sender, no point in starting a timer.
return
}
state.duration = duration
if state.interval == interval {
return
}
if state.timerStart.IsZero() {
state.interval = interval
state.resetTimer()
}
// TODO(axw) handle extending/cutting short running timers once
// it is possible to dynamically control profiling configuration.
}
func (state *profilingState) resetTimer() {
if state.interval != 0 {
state.timer.Reset(state.interval)
state.timerStart = time.Now()
} else {
state.timerStart = time.Time{}
}
}
// start spawns a goroutine that will capture a profile, send it using state.sender,
// and finally signal state.finished.
//
// start will return immediately after spawning the goroutine.
func (state *profilingState) start(ctx context.Context, logger Logger, metadata io.Reader) {
// The state.duration field may be updated after the goroutine starts,
// by the caller, so it must be read outside the goroutine.
duration := state.duration
go func() {
defer func() { state.finished <- struct{}{} }()
if err := state.profile(ctx, duration); err != nil {
if logger != nil && ctx.Err() == nil {
logger.Errorf("%s", err)
}
return
}
// TODO(axw) backoff like SendStream requests
if err := state.sender.SendProfile(ctx, metadata, &state.buf); err != nil {
if logger != nil && ctx.Err() == nil {
logger.Errorf("failed to send %s profile: %s", state.profileType, err)
}
return
}
if logger != nil {
logger.Debugf("sent %s profile", state.profileType)
}
}()
}
func (state *profilingState) profile(ctx context.Context, duration time.Duration) error {
state.buf.Reset()
if err := state.profileStart(&state.buf); err != nil {
return errors.Wrapf(err, "failed to start %s profile", state.profileType)
}
defer state.profileStop()
if duration > 0 {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
return nil
}
type profileSender interface {
SendProfile(ctx context.Context, metadata io.Reader, profile ...io.Reader) error
}