metrics/metrics.go (182 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package metrics
import "time"
type counter interface {
Update(value int64)
}
type timer interface {
Start()
Stop()
Record(time.Duration)
}
type scope interface {
InitCounter(name string) counter
InitTimer(name string) timer
SubScope(name string) scope
Tagged(tags map[string]string) scope
}
type metricsConstructor func() (scope, error)
var metricsInit metricsConstructor = noopMetricsInit
var _ metricsConstructor = tallyMetricsInit
//IdleWorkers count idle workers
var IdleWorkers *ProcessCounter
//TODO: ERRORS
//State contains metrics related to state
type State struct {
NumTablesRegistered *Counter
Sync *ProcessCounter
SyncDuration *Timer
TableReg *ProcessCounter
TableRegDuration *Timer
TableDereg *ProcessCounter
TableDeregDuration *Timer
SyncErrors *Counter
}
//Events represents the common structure for event based metrics
type Events struct {
NumWorkers *ProcessCounter
EventsRead *Counter
EventsWritten *Counter
BytesRead *Counter
BytesWritten *Counter
BatchSize *Timer
ReadLatency *Timer
ProduceLatency *Timer
}
//Snapshot contains metrics related to snapshot reader
type Snapshot struct {
Events
Errors *Counter
Duration *Timer
SizeRead *Counter
SizeWritten *Counter
ThrottledUs *Counter
}
//Streamer contains metrics related to event streamer
type Streamer struct {
Events
TimeInBuffer *Timer
Errors *Counter
LockLost *Counter
}
//ChangelogReader contains metrics related to changelog reader
type ChangelogReader struct {
Events
ChangelogRowEventsWritten *Counter
ChangelogAlterTableEvents *Counter
ChangelogQueryEventsWritten *Counter
ChangelogUnhandledEvents *Counter
TimeToEncounter *Timer
NumTablesIngesting *Counter
Errors *Counter
LockLost *Counter
}
//Validation contains metrics related to validation
type Validation struct {
NumValidations *ProcessCounter
ValidationRowsProcessed *Counter
ValidatedInsertEvents *Counter
ValidatedDeleteEvents *Counter
ValidatedSchemaEvents *Counter
ValidationFailed *Counter
ValidationSuccess *Counter
ValidationSuccessBatch *Counter
ValidationSchemaErrors *Counter
ValidationEventTypeError *Counter
SnapshotBase *Snapshot
SnapshotRef *Snapshot
}
//FilePipeMetrics ...
type FilePipeMetrics struct {
BytesWritten *Counter
BytesRead *Counter
FilesCreated *Counter
FilesOpened *Counter
FilesClosed *Counter // == FilesCreated
}
//getEventsMetrics returns the Events metrics object for a given process (ChangelogReader, Snapshot or Streamer)
func getEventsMetrics(s scope, process string) Events {
return Events{
NumWorkers: ProcessCounterInit(s, "num_"+process+"_workers"),
EventsRead: CounterInit(s, process+"_events_read"),
EventsWritten: CounterInit(s, process+"_events_written"),
BytesRead: CounterInit(s, process+"_bytes_read"),
BytesWritten: CounterInit(s, process+"_bytes_written"),
BatchSize: TimerInit(s, process+"_batch_size"),
ReadLatency: TimerInit(s, process+"_read_latency"),
ProduceLatency: TimerInit(s, process+"_produce_latency"),
}
}
//NewStateMetrics initializes and returns a State metrics object
func NewStateMetrics() *State {
s := getGlobal()
return &State{
NumTablesRegistered: CounterInit(s, "num_tables_registered"),
Sync: ProcessCounterInit(s, "state_sync"),
SyncDuration: TimerInit(s, "state_sync_duration"),
TableReg: ProcessCounterInit(s, "state_sync_table_reg"),
TableRegDuration: TimerInit(s, "state_sync_table_reg_duration"),
TableDereg: ProcessCounterInit(s, "state_sync_table_dereg"),
TableDeregDuration: TimerInit(s, "state_sync_table_dereg_duration"),
SyncErrors: CounterInit(s, "state_sync_errors"),
}
}
//NewChangelogReaderMetrics initializes and returns a Changelog metrics object
func NewChangelogReaderMetrics(tags map[string]string) *ChangelogReader {
s := getGlobal().Tagged(tags)
return &ChangelogReader{
Events: getEventsMetrics(s, "changelog"),
ChangelogRowEventsWritten: CounterInit(s, "changelog_row_events_written"),
ChangelogQueryEventsWritten: CounterInit(s, "changelog_query_events_written"),
ChangelogUnhandledEvents: CounterInit(s, "changelog_unhandled_events"),
TimeToEncounter: TimerInit(s, "time_to_encounter"),
NumTablesIngesting: CounterInit(s, "num_tables_ingesting"),
Errors: CounterInit(s, "changelog_error"),
ChangelogAlterTableEvents: CounterInit(s, "changelog_alter_table_events"),
LockLost: CounterInit(s, "changelog_lock_lost"),
}
}
//NewStreamerMetrics initializes and returns a Streamer metrics object
func NewStreamerMetrics(tags map[string]string) *Streamer {
s := getGlobal().Tagged(tags)
return &Streamer{
Events: getEventsMetrics(s, "streamer"),
TimeInBuffer: TimerInit(s, "time_in_buffer"),
Errors: CounterInit(s, "streamer_error"),
LockLost: CounterInit(s, "streamer_lock_lost"),
}
}
//NewSnapshotMetrics initializes and returns a Snapshot metrics object
func NewSnapshotMetrics(prefix string, tags map[string]string) *Snapshot {
s := getGlobal().Tagged(tags)
return &Snapshot{
Events: getEventsMetrics(s, prefix+"snapshot"),
Errors: CounterInit(s, prefix+"snapshot_error"),
Duration: TimerInit(s, prefix+"snapshot_duration"),
SizeRead: CounterInit(s, prefix+"snapshot_size_read"),
SizeWritten: CounterInit(s, prefix+"snapshot_size_written"),
ThrottledUs: CounterInit(s, prefix+"snapshot_throttled_us"),
}
}
//NewValidationMetrics initializes and returns a Validation metrics object
func NewValidationMetrics(tags map[string]string) *Validation {
s := getGlobal().Tagged(tags)
return &Validation{
NumValidations: ProcessCounterInit(s, "num_validations"),
ValidationRowsProcessed: CounterInit(s, "validation_rows_processed"),
ValidatedInsertEvents: CounterInit(s, "validated_insert_events"),
ValidatedDeleteEvents: CounterInit(s, "validated_delete_events"),
ValidatedSchemaEvents: CounterInit(s, "validated_schema_events"),
ValidationFailed: CounterInit(s, "validation_failed"),
ValidationSuccess: CounterInit(s, "validation_success"),
ValidationSuccessBatch: CounterInit(s, "validation_success_batch"),
ValidationEventTypeError: CounterInit(s, "validation_event_type_errors"),
SnapshotBase: NewSnapshotMetrics("validation_", tags),
SnapshotRef: NewSnapshotMetrics("validation_", tags),
}
}
//NewFilePipeMetrics initializes and returns a FilePipeMetrics object
func NewFilePipeMetrics(prefix string, tags map[string]string) *FilePipeMetrics {
s := getGlobal().Tagged(tags)
return &FilePipeMetrics{
BytesWritten: CounterInit(s, prefix+"_bytes_written"),
BytesRead: CounterInit(s, prefix+"_bytes_read"),
FilesOpened: CounterInit(s, prefix+"_files_opened"),
FilesClosed: CounterInit(s, prefix+"_files_closed"),
}
}
var m scope
//Init initializes global metrics structure
func Init() error {
var err error
if m, err = metricsInit(); err != nil {
return err
}
IdleWorkers = ProcessCounterInit(m, "idle")
return nil
}
//getGlobal return global metrics scope
func getGlobal() scope {
return m
}