lib/torrent/scheduler/torrentlog/logger.go (171 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package torrentlog
import (
"errors"
"fmt"
"os"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/utils/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
errEmptyReceivedPieces = errors.New("empty received piece counts")
errNegativeReceivedPieces = errors.New("negative value in received piece counts")
)
// Logger wraps structured log entries for important torrent events. These events
// are intended to be consumed at the cluster level via ELK, and are distinct from
// the verbose stdout logs of the agent. In particular, Logger bridges host-agnostic
// metrics to individual hostnames.
//
// For example, if there is a spike in download times, an engineer can cross-reference
// the spike with the torrent logs in ELK and zero-in on a single host. From there,
// the engineer can inspect the stdout logs of the host for more detailed information
// as to why the download took so long.
type Logger struct {
zap *zap.Logger
}
// New creates a new Logger.
func New(config log.Config, pctx core.PeerContext) (*Logger, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("hostname: %s", err)
}
logger, err := log.New(config, map[string]interface{}{
"hostname": hostname,
"zone": pctx.Zone,
"cluster": pctx.Cluster,
"peer_id": pctx.PeerID.String(),
})
if err != nil {
return nil, fmt.Errorf("config: %s", err)
}
return &Logger{logger}, nil
}
// NewNopLogger returns a Logger containing a no-op zap logger for testing purposes.
func NewNopLogger() *Logger {
return &Logger{zap.NewNop()}
}
// OutgoingConnectionAccept logs an accepted outgoing connection.
func (l *Logger) OutgoingConnectionAccept(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID) {
l.zap.Debug(
"Outgoing connection accept",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()))
}
// OutgoingConnectionReject logs a rejected outgoing connection.
func (l *Logger) OutgoingConnectionReject(d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID,
err error) {
l.zap.Debug(
"Outgoing connection reject",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()),
zap.Error(err))
}
// IncomingConnectionAccept logs an accepted incoming connection.
func (l *Logger) IncomingConnectionAccept(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID) {
l.zap.Debug(
"Incoming connection accept",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()))
}
// IncomingConnectionReject logs a rejected incoming connection.
func (l *Logger) IncomingConnectionReject(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID,
err error) {
l.zap.Debug(
"Incoming connection reject",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()),
zap.Error(err))
}
// SeedTimeout logs a seeding torrent being torn down due to timeout.
func (l *Logger) SeedTimeout(d core.Digest, infoHash core.InfoHash) {
l.zap.Debug(
"Seed timeout",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()))
}
// LeechTimeout logs a leeching torrent being torn down due to timeout.
func (l *Logger) LeechTimeout(d core.Digest, infoHash core.InfoHash) {
l.zap.Debug(
"Leech timeout",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()))
}
// DownloadSuccess logs a successful download.
func (l *Logger) DownloadSuccess(namespace string, d core.Digest, size int64, downloadTime time.Duration) {
l.zap.Info(
"Download success",
zap.String("namespace", namespace),
zap.String("name", d.Hex()),
zap.Int64("size", size),
zap.Duration("download_time", downloadTime))
}
// DownloadFailure logs a failed download.
func (l *Logger) DownloadFailure(namespace string, d core.Digest, size int64, err error) {
l.zap.Error(
"Download failure",
zap.String("namespace", namespace),
zap.String("name", d.Hex()),
zap.Int64("size", size),
zap.Error(err))
}
// SeederSummaries logs a summary of the pieces requested and received from peers for a torrent.
func (l *Logger) SeederSummaries(
d core.Digest,
infoHash core.InfoHash,
summaries SeederSummaries) error {
l.zap.Debug(
"Seeder summaries",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.Array("seeder_summaries", summaries))
return nil
}
// LeecherSummaries logs a summary of the pieces requested by and sent to peers for a torrent.
func (l *Logger) LeecherSummaries(
d core.Digest,
infoHash core.InfoHash,
summaries LeecherSummaries) error {
l.zap.Debug(
"Leecher summaries",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.Array("leecher_summaries", summaries))
return nil
}
// Sync flushes the log.
func (l *Logger) Sync() {
l.zap.Sync()
}
// SeederSummary contains information about piece requests to and pieces received from a peer.
type SeederSummary struct {
PeerID core.PeerID
RequestsSent int
GoodPiecesReceived int
DuplicatePiecesReceived int
}
// MarshalLogObject marshals a SeederSummary for logging.
func (s SeederSummary) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("peer_id", s.PeerID.String())
enc.AddInt("requests_sent", s.RequestsSent)
enc.AddInt("good_pieces_received", s.GoodPiecesReceived)
enc.AddInt("duplicate_pieces_received", s.DuplicatePiecesReceived)
return nil
}
// SeederSummaries represents a slice of type SeederSummary
// that can be marshalled for logging.
type SeederSummaries []SeederSummary
// MarshalLogArray marshals a SeederSummaries slice for logging.
func (ss SeederSummaries) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, summary := range ss {
enc.AppendObject(summary)
}
return nil
}
// LeecherSummary contains information about piece requests from and pieces sent to a peer.
type LeecherSummary struct {
PeerID core.PeerID
RequestsReceived int
PiecesSent int
}
// MarshalLogObject marshals a LeecherSummary for logging.
func (s LeecherSummary) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("peer_id", s.PeerID.String())
enc.AddInt("requests_received", s.RequestsReceived)
enc.AddInt("pieces_sent", s.PiecesSent)
return nil
}
// LeecherSummaries represents a slice of type LeecherSummary
// that can be marshalled for logging.
type LeecherSummaries []LeecherSummary
// MarshalLogArray marshals a LeecherSummaries slice for logging.
func (ls LeecherSummaries) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, summary := range ls {
enc.AppendObject(summary)
}
return nil
}