in pylib/vcsreplicator/vcsreplicator/hgext.py [0:0]
def uisetup(ui):
# We assume that if the extension is loaded that we want replication
# support enabled. Validate required config options are present.
hosts = ui.configlist(b"replicationproducer", b"hosts")
if not hosts:
raise error.Abort(b"replicationproducer.hosts config option not set")
clientid = ui.config(b"replicationproducer", b"clientid")
if not clientid:
raise error.Abort(b"replicationproducer.clientid config option not set")
timeout = ui.configint(b"replicationproducer", b"connecttimeout", 10)
topic = ui.config(b"replicationproducer", b"topic")
if not topic:
raise error.Abort(b"replicationproducer.topic config option not set")
def havepartitionmap():
for k, v in ui.configitems(b"replicationproducer"):
if k.startswith(b"partitionmap."):
return True
return False
if not havepartitionmap():
raise error.Abort(
b"replicationproducer.partitionmap.* config options not set"
)
reqacks = ui.configint(b"replicationproducer", b"reqacks", default=999)
if reqacks not in (-1, 0, 1):
raise error.Abort(b"replicationproducer.reqacks must be set to -1, 0, or 1")
acktimeout = ui.configint(b"replicationproducer", b"acktimeout")
if not acktimeout:
raise error.Abort(b"replicationproducer.acktimeout config option not set")
# TRACKING py3
hosts = list(map(lambda x: pycompat.sysstr(x), hosts))
clientid = pycompat.sysstr(clientid)
topic = pycompat.sysstr(topic)
class replicatingui(ui.__class__):
"""Custom ui class that provides access to replication primitives."""
@property
def replicationproducer(self):
"""Obtain a ``Producer`` instance to write to the replication log."""
if not getattr(self, "_replicationproducer", None):
client = SimpleClient(hosts, client_id=clientid, timeout=timeout)
self._replicationproducer = vcsrproducer.Producer(
client,
topic,
batch_send=False,
req_acks=reqacks,
ack_timeout=acktimeout,
)
return self._replicationproducer
@property
def replicationpartitionmap(self):
pm = {}
replicationproduceritems = (
(
pycompat.sysstr(k),
pycompat.sysstr(v),
)
for k, v in self.configitems(b"replicationproducer")
)
for k, v in replicationproduceritems:
# Ignore unrelated options in this section.
if not k.startswith("partitionmap."):
continue
parts, expr = v.split(":", 1)
parts = [int(x.strip()) for x in parts.split(",")]
pm[k[len("partitionmap.") :]] = (parts, re.compile(expr))
if not pm:
raise error.Abort(_(b"partitions not defined"))
return pm
@property
def replicationpartitions(self):
s = set()
for partitions, expr in self.replicationpartitionmap.values():
s |= set(partitions)
return s
@contextlib.contextmanager
def kafkainteraction(self):
"""Perform interactions with Kafka with error handling.
All interactions with Kafka should occur inside this context
manager. Kafka exceptions will be caught and handled specially.
"""
try:
yield
except kafkacommon.KafkaError as e:
self.producerlog(
"<unknown>",
"KAFKA_EXCEPTION",
"%s: %s" % (e, traceback.format_exc()),
)
raise
def producerlog(self, repo, action, *args):
"""Write to the producer syslog facility."""
ident = self.config(
b"replicationproducer", b"syslogident", b"vcsreplicator"
)
facility = self.config(
b"replicationproducer", b"syslogfacility", b"LOG_LOCAL2"
)
if not ident or not facility:
raise error.Abort(
b"syslog identity or facility missing from "
b"replicationproducer config"
)
ident = pycompat.sysstr(ident)
facility = pycompat.sysstr(facility)
facility = getattr(syslog, facility)
syslog.openlog(ident, 0, facility)
if not isinstance(repo, (bytes, str)):
repo = repo.replicationwireprotopath
pre = "%s %s %s" % (os.environ.get("USER", "<unknown>"), repo, action)
syslog.syslog(syslog.LOG_NOTICE, "%s %s" % (pre, " ".join(args)))
syslog.closelog()
ui.__class__ = replicatingui