hgext/serverlog/__init__.py (238 lines of code) (raw):
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""Log requests to a Mercurial server.
The intent of this extension is to log requests to a Mercurial server so
server operators can have better insight into what a server is doing.
The extension is tailored for Mozilla's needs.
Installing
==========
In your hgrc, add the following:
[extensions]
serverlog = /path/to/version-control-tools/hgext/serverlog
Configuration Options
=====================
serverlog.reporoot
Root path for all repositories. When logging the repository path, this
prefix will be stripped.
serverlog.hgweb
Whether to record requests for hgweb. Defaults to False.
serverlog.ssh
Whether to record requests for ssh server. Defaults to False.
serverlog.datalogsizeinterval
Interval (in bytes) between log events when data is being streamed to
clients. Default value is 10,000,000.
serverlog.blackbox
Enable blackbox logging using Mercurial's ui.log() facility.
serverlog.blackbox.service
Service name to use when logging to ui.log(). Defaults to ``hgweb``.
serverlog.syslog
Enable syslog logging.
serverlog.syslog.ident
String to prefix all syslog entries with. Defaults to "hgweb".
serverlog.syslog.facility
String syslog facility to write to. Corresponds to a LOG_* attribute
in the syslog module. Defaults to LOG_LOCAL2.
Logged Messages
===============
syslog messages conform to a well-defined string format:
[<session id>:]<request id> <action> [<arg0> [<arg1> [...]]]
The first word is a single or colon-delimited pair of UUIDs. This identifier
is used to associate multiple events together.
HTTP requests will have a single UUID. A new UUID will be generated at the
beginning of the request.
SSH sessions will have multiple UUIDs. The first UUID is a session ID. It will
be created when the connection initiates. Subsequent UUIDs will be generated
for each command processed on the SSH server.
The idea is to write "point" events as soon as they happen and to correlate
these point events into higher-level events later. This approach enables
streaming consumers of the log output to identify in-flight requests. If we
buffered log messages until response completion (such as Apache request logs),
we wouldn't haven't a good handle on what the server is actively doing.
The actions are defined in the following sections.
BEGIN_REQUEST
-------------
Written when a new request comes in. This occurs for all HTTP requests.
Arguments:
* repo path
* client ip ("UNKNOWN" if not known)
* URL path and query string
e.g. ``bc286e11-1e44-11e4-8889-b8e85631ff68 BEGIN_REQUEST server2 127.0.0.1 /?cmd=capabilities``
BEGIN_PROTOCOL
--------------
Written when a command from the wire protocol is about to be executed. This
almost certainly derives from a Mercurial client talking to the server (as
opposed to say a browser viewing HTML).
Arguments:
* command name
e.g. ``bc286e11-1e44-11e4-8889-b8e85631ff68 BEGIN_PROTOCOL capabilities``
END_REQUEST
-----------
Written when a request finishes and all data has been sent.
There should be an ``END_REQUEST`` for every ``BEGIN_REQUEST``.
Arguments:
* Integer bytes written to client
* Float wall time to process request
* Float CPU time to process request
e.g. ``bc286e11-1e44-11e4-8889-b8e85631ff68 END 0 0.002 0.002``
BEGIN_SSH_SESSION
-----------------
Written when an SSH session starts.
Arguments:
* repo path
* username creating the session
e.g. ``c9417b51-1e4b-11e4-8adf-b8e85631ff68: BEGIN_SSH_SESSION mozilla-central gps``
Note that there is an empty request id for this event!
END_SSH_SESSION
---------------
Written when an SSH session terminates.
Arguments:
* Float wall time of session
* Float CPU time of session
e.g. ``3f74662b-1e4c-11e4-af00-b8e85631ff68: END_SSH_SESSION 1.716 0.000``
BEGIN_SSH_COMMAND
-----------------
Written when an SSH session starts processing a command.
Arguments:
* command name
e.g. ``9bddcd66-1e4e-11e4-af92-b8e85631ff68:9bdf08ab-1e4e-11e4-836d-b8e85631ff68 BEGIN_SSH_COMMAND between``
END_SSH_COMMAND
---------------
Written when an SSH session finishes processing a command.
Arguments:
* Float wall time to process command
* Float CPU time to process command
e.g. ``9bddcd66-1e4e-11e4-af92-b8e85631ff68:9bdf08ab-1e4e-11e4-836d-b8e85631ff68 END_SSH_COMMAND 0.000 0.000``
Limitations
===========
The extension currently only uses syslog for writing events.
The extension assumes only 1 thread is running per process. If multiple threads
are running, CPU time calculations will not be accurate. Other state may get
mixed up.
"""
from __future__ import absolute_import
import gc
import os
import resource
import syslog
import time
import uuid
import weakref
from mercurial import (
error,
extensions,
pycompat,
registrar,
wireprotoserver,
wireprototypes,
wireprotov1server,
)
from mercurial.hgweb import (
hgweb_mod,
)
OUR_DIR = os.path.normpath(os.path.dirname(__file__))
with open(os.path.join(OUR_DIR, "..", "bootstrap.py")) as f:
exec(f.read())
testedwith = b"4.8 4.9 5.0 5.1 5.2 5.3 5.4 5.5"
minimumhgversion = b"4.8"
configtable = {}
configitem = registrar.configitem(configtable)
configitem(b"serverlog", b"blackbox", default=True)
configitem(b"serverlog", b"blackbox.service", default=b"hgweb")
configitem(b"serverlog", b"reporoot", default="")
configitem(b"serverlog", b"hgweb", default=True)
configitem(b"serverlog", b"ssh", default=True)
configitem(b"serverlog", b"datalogsizeinterval", default=10000000)
configitem(b"serverlog", b"syslog", default=True)
configitem(b"serverlog", b"syslog.ident", default=b"hgweb")
configitem(b"serverlog", b"syslog.facility", b"LOG_LOCAL2")
class fileobjectproxy(object):
"""A proxy around a file object that stores a serverlog reference."""
__slots__ = (
"_fp",
"_serverlog",
)
def __init__(self, fp, serverlog):
object.__setattr__(self, "_fp", fp)
object.__setattr__(self, "_serverlog", serverlog)
def __getattribute__(self, name):
if name in ("_fp", "_serverlog"):
return object.__getattribute__(self, name)
return getattr(object.__getattribute__(self, "_fp"), name)
def __delattr__(self, name):
return delattr(object.__getattribute__(self, "_fp"), name)
def __setattr__(self, name, value):
return setattr(object.__getattribute__(self, "_fp"), name, value)
def wrappeddispatch(orig, repo, proto, command, *args, **kwargs):
"""Wraps wireprotov1server.dispatch() to record command requests."""
# TRACKING hg46
# For historical reasons, SSH and HTTP use different log events. With
# the unification of the dispatch code in 4.6, we could likely unify these.
# Keep in mind this function is only called on 4.6+: 4.5 has a different
# code path completely.
if isinstance(proto, wireprotoserver.httpv1protocolhandler):
logevent(repo.ui, repo._serverlog, "BEGIN_PROTOCOL", command)
elif isinstance(proto, wireprotoserver.sshv1protocolhandler):
logevent(repo.ui, repo._serverlog, "BEGIN_SSH_COMMAND", command)
startusage = resource.getrusage(resource.RUSAGE_SELF)
repo._serverlog.update(
{
"requestid": pycompat.bytestr(uuid.uuid1()),
"startcpu": startusage.ru_utime + startusage.ru_stime,
"starttime": time.time(),
"ui": weakref.ref(repo.ui),
}
)
else:
raise error.ProgrammingError(b"unhandled protocol handler: %r" % proto)
# If the return type is a `pushres`, `_sshv1respondbytes` will be called twice.
# We only want to log a completed SSH event on the second call, so flip the
# `ignorecall` flag here.
res = orig(repo, proto, command, *args, **kwargs)
if isinstance(res, wireprototypes.pushres):
repo._serverlog["ignorecall"] = True
return res
def wrapped_getpayload(orig, self):
"""Wraps `sshv1protocolhandler.getpayload` to mark bytes responses as
non-terminating.
"""
self._fout._serverlog["ignorecall"] = True
return orig(self)
def wrappedsshv1respondbytes(orig, fout, rsp):
# check if this response is non-terminating (ie if `wrapped_getpayload`
# set the flag just before this)
if fout._serverlog.get("ignorecall"):
fout._serverlog["ignorecall"] = False
return orig(fout, rsp)
try:
return orig(fout, rsp)
finally:
record_completed_ssh_command(fout)
def wrappedsshv1respondstream(orig, fout, rsp):
try:
return orig(fout, rsp)
finally:
record_completed_ssh_command(fout)
def wrappedsshv1respondooberror(orig, fout, ferr, message):
try:
return orig(fout, ferr, message)
finally:
record_completed_ssh_command(fout)
def record_completed_ssh_command(fout):
serverlog = fout._serverlog
ui = serverlog["ui"]()
# This should not occur. But weakrefs are weakrefs. Be paranoid.
if not ui:
return
endtime = time.time()
endusage = resource.getrusage(resource.RUSAGE_SELF)
endcpu = endusage.ru_utime + endusage.ru_stime
deltatime = endtime - serverlog["starttime"]
deltacpu = endcpu - serverlog["startcpu"]
logevent(ui, serverlog, "END_SSH_COMMAND", b"%.3f" % deltatime, b"%.3f" % deltacpu)
del serverlog["ui"]
del serverlog["starttime"]
del serverlog["startcpu"]
serverlog["requestid"] = b""
def repopath(repo):
root = repo.ui.config(b"serverlog", b"reporoot")
if root and not root.endswith(b"/"):
root += b"/"
path = repo.path
if root and path.startswith(root):
path = path[len(root) :]
path = path.rstrip(b"/").rstrip(b"/.hg")
return path
def logevent(ui, context, action, *args):
"""Log a server event.
``context`` is a dict containing state of the session/request.
``action`` is the event name and ``args`` are arguments specific to the
``action``.
"""
fmt = b"%s %s %s"
formatters = (context["requestid"], pycompat.bytestr(action), b" ".join(args))
if context.get("sessionid"):
fmt = b"%s:" + fmt
formatters = tuple([context["sessionid"]] + list(formatters))
if ui.configbool(b"serverlog", b"blackbox"):
ui.log(ui.config(b"serverlog", b"blackbox.service"), fmt + b"\n", *formatters)
if ui.configbool(b"serverlog", b"syslog"):
logsyslog(ui, fmt % formatters)
def logsyslog(ui, message):
"""Log a formatted message to syslog."""
ident = pycompat.sysstr(ui.config(b"serverlog", b"syslog.ident"))
facility_config = pycompat.sysstr(ui.config(b"serverlog", b"syslog.facility"))
facility = getattr(syslog, facility_config)
syslog.openlog(ident, 0, facility)
syslog.syslog(syslog.LOG_NOTICE, pycompat.sysstr(message))
syslog.closelog()
def wrapped_runwsgi(orig, self, req, res, repo):
"""Wrap hgweb._runwsgi to capture timing and CPU usage information"""
serverlog = {
"requestid": pycompat.bytestr(uuid.uuid1()),
"writecount": 0,
}
env = req.rawenv
# Resolve the repository path.
# If serving with multiple repos via hgwebdir_mod, REPO_NAME will be
# set to the relative path of the repo (I think).
serverlog["path"] = req.apppath or repopath(repo)
serverlog["ip"] = (
env.get(b"HTTP_X_CLUSTER_CLIENT_IP") or env.get(b"REMOTE_ADDR") or b"UNKNOWN"
)
# Stuff a reference to the state and the bound logging method so we can
# record and log inside request handling.
self._serverlog = serverlog
repo._serverlog = serverlog
# TODO REQUEST_URI may not be defined in all WSGI environments,
# including wsgiref. We /could/ copy code from hgweb_mod here.
uri = env.get(b"REQUEST_URI", b"UNKNOWN")
sl = serverlog
logevent(repo.ui, sl, "BEGIN_REQUEST", sl["path"], sl["ip"], uri)
startusage = resource.getrusage(resource.RUSAGE_SELF)
startcpu = startusage.ru_utime + startusage.ru_stime
starttime = time.time()
datasizeinterval = repo.ui.configint(b"serverlog", b"datalogsizeinterval")
lastlogamount = 0
try:
for what in orig(self, req, res, repo):
sl["writecount"] += len(what)
yield what
if sl["writecount"] - lastlogamount > datasizeinterval:
logevent(repo.ui, sl, "WRITE_PROGRESS", b"%d" % sl["writecount"])
lastlogamount = sl["writecount"]
finally:
# It is easy to introduce cycles in localrepository instances.
# Versions of Mercurial up to and including 4.5 leak repo instances
# in hgwebdir. We force a GC on every request to help mitigate
# these leaks.
gc.collect()
endtime = time.time()
endusage = resource.getrusage(resource.RUSAGE_SELF)
endcpu = endusage.ru_utime + endusage.ru_stime
deltatime = endtime - starttime
deltacpu = endcpu - startcpu
logevent(
repo.ui,
sl,
"END_REQUEST",
b"%d" % sl["writecount"],
b"%.3f" % deltatime,
b"%.3f" % deltacpu,
)
class sshserverwrapped(wireprotoserver.sshserver):
"""Wrap sshserver class to record events."""
def serve_forever(self):
repo = self._repo
serverlog = {
"sessionid": pycompat.bytestr(uuid.uuid1()),
"requestid": b"",
"path": repopath(repo),
}
# Stuff a reference to the state so we can do logging within repo
# methods.
repo._serverlog = serverlog
logevent(
repo.ui,
serverlog,
"BEGIN_SSH_SESSION",
serverlog["path"],
repo.ui.environ[b"USER"],
)
self._serverlog = serverlog
startusage = resource.getrusage(resource.RUSAGE_SELF)
startcpu = startusage.ru_utime + startusage.ru_stime
starttime = time.time()
try:
return super(sshserverwrapped, self).serve_forever()
finally:
endtime = time.time()
endusage = resource.getrusage(resource.RUSAGE_SELF)
endcpu = endusage.ru_utime + endusage.ru_stime
deltatime = endtime - starttime
deltacpu = endcpu - startcpu
logevent(
repo.ui,
serverlog,
"END_SSH_SESSION",
b"%.3f" % deltatime,
b"%.3f" % deltacpu,
)
self._serverlog = None
def wrapped_runsshserver(orig, ui, repo, fin, fout, ev, *args, **kwargs):
fout = fileobjectproxy(fout, repo._serverlog)
return orig(ui, repo, fin, fout, ev, *args, **kwargs)
def extsetup(ui):
if wireprotov1server:
extensions.wrapfunction(wireprotov1server, "dispatch", wrappeddispatch)
if wireprotoserver:
extensions.wrapfunction(
wireprotoserver.sshv1protocolhandler, "getpayload", wrapped_getpayload
)
extensions.wrapfunction(
wireprotoserver, "_sshv1respondbytes", wrappedsshv1respondbytes
)
extensions.wrapfunction(
wireprotoserver, "_sshv1respondstream", wrappedsshv1respondstream
)
extensions.wrapfunction(
wireprotoserver, "_sshv1respondooberror", wrappedsshv1respondooberror
)
extensions.wrapfunction(
wireprotoserver, "_runsshserver", wrapped_runsshserver
)
if ui.configbool(b"serverlog", b"hgweb"):
extensions.wrapfunction(hgweb_mod.hgweb, "_runwsgi", wrapped_runwsgi)
if ui.configbool(b"serverlog", b"ssh"):
wireprotoserver.sshserver = sshserverwrapped