scripts/cronjobs/pubsubber.py (232 lines of code) (raw):
#################################################
# Pubsubber.py: a minimalistic svnwcsub program #
#################################################
from threading import Thread
from datetime import datetime
import os
import sys
import logging
import atexit
import signal
import json
import re
import time
import subprocess
import base64
version = 2
if sys.hexversion < 0x03000000:
print("Using Python 2...")
import httplib, urllib, urllib2, ConfigParser as configparser, socket
socket._fileobject.default_bufsize = 0
else:
print("Using Python 3")
version = 3
import http.client, urllib.request, urllib.parse, configparser
debug = False
logger = None
watching={} # dict: key = path to watch, value = set() of paths to update on match
############################################################
# Daemon class, courtesy of an anonymous good-hearted soul #
############################################################
class daemon:
"""A generic daemon class.
Usage: subclass the daemon class and override the run() method."""
def __init__(self, pidfile, logfile = None):
self.pidfile = pidfile
if logfile == None:
self.logfile = os.devnull
else:
self.logfile = logfile
def daemonize(self):
"""Daemonize class. UNIX double fork mechanism."""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #1 failed: {0}\n'.format(err))
sys.exit(1)
# decouple from parent environment
os.chdir('/')
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #2 failed: {0}\n'.format(err))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(self.logfile, 'a+')
se = open(self.logfile, 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
with open(self.pidfile,'w+') as f:
f.write(pid + '\n')
logger.info("Created %s", self.pidfile)
def delpid(self):
logger.info("Removing %s", self.pidfile)
os.remove(self.pidfile)
def start(self):
"""Start the daemon."""
# Check for a pidfile to see if the daemon already runs
try:
with open(self.pidfile,'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if pid:
message = "pidfile {0} already exist. " + \
"Daemon already running?\n"
sys.stderr.write(message.format(self.pidfile))
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""Stop the daemon."""
# Get the pid from the pidfile
try:
with open(self.pidfile,'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if not pid:
message = "pidfile {0} does not exist. " + \
"Daemon not running?\n"
sys.stderr.write(message.format(self.pidfile))
return # not an error in a restart
# Try killing the daemon process
try:
# Try gentle stop first
os.kill(pid, signal.SIGINT)
time.sleep(0.2)
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError as err:
e = str(err.args)
if e.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print (str(err.args))
sys.exit(1)
def restart(self):
"""Restart the daemon."""
self.stop()
self.start()
def run(self):
"""You should override this method when you subclass Daemon.
It will be called after the process has been daemonized by
start() or restart()."""
####################
# Helper functions #
####################
# read_chunk: iterator for reading chunks from the stream
# since this is all handled via urllib now, this is quite rudimentary
def read_chunk(req):
while True:
try:
line = req.readline().strip()
if line:
yield line
else:
print("No more lines?")
break
except Exception as info:
break
return
#########################
# Main listener program #
#########################
# PubSub class: handles connecting to a pubsub service and checking commits
class PubSubClient(Thread):
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
def run(self):
logger.info("Watching %s", watching)
while True:
self.req = None
while not self.req:
try:
if version == 3:
self.req = urllib.request.urlopen(self.url, None, 30)
else:
self.req = urllib2.urlopen(self.url, None, 30)
logger.info("Connected to %s", self.url)
except:
logger.warn("Failed to connect to %s", self.url)
time.sleep(30)
continue
for line in read_chunk(self.req):
if version == 3:
line = str( line, encoding='ascii' ).rstrip('\r\n,').replace('\x00','') # strip away any old pre-0.9 commas from gitpubsub chunks and \0 in svnpubsub chunks
else:
line = str( line ).rstrip('\r\n,').replace('\x00','') # strip away any old pre-0.9 commas from gitpubsub chunks and \0 in svnpubsub chunks
try:
obj = json.loads(line)
if "commit" in obj and "repository" in obj['commit']:
# If it's our public svn repo, then...
if obj['commit']['repository'] == "13f79535-47bb-0310-9956-ffa450edef68":
#Grab some vars
commit = obj['commit']
# e.g. {"committer": "sebb", "log": "Ensure we exit on control+C", "repository": "13f79535-47bb-0310-9956-ffa450edef68", "format": 1,
# "changed": {"comdev/reporter.apache.org/trunk/scandist.py": {"flags": "U "}},
# "date": "2015-07-13 13:38:33 +0000 (Mon, 13 Jul 2015)", "type": "svn", "id": 1690668}
svnuser = commit['committer']
revision = commit['id']
filePaths = set()
# e.g. {"comdev/reporter.apache.org/trunk/scandist.py": {"flags": "U "}}
for path in commit['changed']:
for watchPath in watching:
# Check if the commit is for our part of the repo
match = re.match("^%s" % watchPath, path)
if match:
filePath = str(watching[watchPath])
if debug:
print("Matched '" + path + "' against '" + watchPath + "'; would run 'svn up " + filePath + "'")
filePaths.update(watching[watchPath])
if filePaths:
for filePath in filePaths:
if debug:
print("Matched 'r" + str(revision) + "'; would run 'svn up " + filePath + "'")
else:
time.sleep(3)
logger.info("svn up %s", filePath)
subprocess.call(['svn','up', filePath])
else:
logger.debug("Did not match 'r" + str(revision) + "' against ' " + str(watching.keys()) + "'")
if debug:
print("Did not match 'r" + str(revision) + "' against ' " + str(watching.keys()) + "'")
except ValueError as detail:
continue
################
# Main program #
################
"""
According to https://svn.apache.org/repos/asf/subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py
# URLs are constructed from 3 parts:
# /${notification}/${optional_type}/${optional_repository}
#
# Notifications can be sent for commits or metadata (e.g., revprop) changes.
# If the type is included in the URL, you will only get notifications of that type.
# The type can be * and then you will receive notifications of any type.
#
# If the repository is included in the URL, you will only receive
# messages about that repository. The repository can be * and then you
# will receive messages about all repositories.
"""
def main():
if debug:
print("Foreground test mode enabled, no updates will be made")
for watchPath in watching:
print("Watching: '" + watchPath + "' for updates to '" + str(watching[watchPath]) + "'")
# Start the svn thread
svn_thread = PubSubClient()
svn_thread.url = "http://svn-master.apache.org:2069/commits/*"
svn_thread.start()
while True:
try:
time.sleep(60)
except KeyboardInterrupt:
logger.info("Detected shutdown interrupt")
pass
##############
# Daemonizer #
##############
class MyDaemon(daemon):
def run(self):
main()
def usage():
print("usage: %s start|stop|restart|foreground ([repo-path] [file-path])*" % sys.argv[0])
print("for example: %s start comdev/projects.apache.org /var/www/projects.apache.org" % sys.argv[0])
def handle_args(args):
if len(args) % 2 != 0:
usage()
sys.exit("Need an even number of repo/file paths, found "+str(len(args)))
else:
for i in range(0, len(args), 2):
try:
watching[args[i]].update(args[i+1])
except KeyError:
watching[args[i]] = set([args[i+1]])
if __name__ == "__main__":
logger = logging.getLogger('pubsubber')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
logfile = os.environ.get('LOGFILE')
logger.info("LOGFILE=%s", logfile)
daemon = MyDaemon('/tmp/pubsubber.pid', logfile)
if len(sys.argv) >= 2:
if 'start' == sys.argv[1]:
handle_args(sys.argv[2:])
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'foreground' == sys.argv[1]:
debug = True
handle_args(sys.argv[2:])
main()
else:
usage()
sys.exit(2)
sys.exit(0)
else:
usage()
sys.exit(2)