def uisetup()

in pylib/vcsreplicator/vcsreplicator/hgext.py [0:0]


def uisetup(ui):
    # We assume that if the extension is loaded that we want replication
    # support enabled. Validate required config options are present.
    hosts = ui.configlist(b"replicationproducer", b"hosts")
    if not hosts:
        raise error.Abort(b"replicationproducer.hosts config option not set")

    clientid = ui.config(b"replicationproducer", b"clientid")
    if not clientid:
        raise error.Abort(b"replicationproducer.clientid config option not set")

    timeout = ui.configint(b"replicationproducer", b"connecttimeout", 10)

    topic = ui.config(b"replicationproducer", b"topic")
    if not topic:
        raise error.Abort(b"replicationproducer.topic config option not set")

    def havepartitionmap():
        for k, v in ui.configitems(b"replicationproducer"):
            if k.startswith(b"partitionmap."):
                return True
        return False

    if not havepartitionmap():
        raise error.Abort(
            b"replicationproducer.partitionmap.* config options not set"
        )

    reqacks = ui.configint(b"replicationproducer", b"reqacks", default=999)
    if reqacks not in (-1, 0, 1):
        raise error.Abort(b"replicationproducer.reqacks must be set to -1, 0, or 1")

    acktimeout = ui.configint(b"replicationproducer", b"acktimeout")
    if not acktimeout:
        raise error.Abort(b"replicationproducer.acktimeout config option not set")

    # TRACKING py3
    hosts = list(map(lambda x: pycompat.sysstr(x), hosts))
    clientid = pycompat.sysstr(clientid)
    topic = pycompat.sysstr(topic)

    class replicatingui(ui.__class__):
        """Custom ui class that provides access to replication primitives."""

        @property
        def replicationproducer(self):
            """Obtain a ``Producer`` instance to write to the replication log."""
            if not getattr(self, "_replicationproducer", None):
                client = SimpleClient(hosts, client_id=clientid, timeout=timeout)
                self._replicationproducer = vcsrproducer.Producer(
                    client,
                    topic,
                    batch_send=False,
                    req_acks=reqacks,
                    ack_timeout=acktimeout,
                )

            return self._replicationproducer

        @property
        def replicationpartitionmap(self):
            pm = {}
            replicationproduceritems = (
                (
                    pycompat.sysstr(k),
                    pycompat.sysstr(v),
                )
                for k, v in self.configitems(b"replicationproducer")
            )
            for k, v in replicationproduceritems:
                # Ignore unrelated options in this section.
                if not k.startswith("partitionmap."):
                    continue

                parts, expr = v.split(":", 1)
                parts = [int(x.strip()) for x in parts.split(",")]
                pm[k[len("partitionmap.") :]] = (parts, re.compile(expr))

            if not pm:
                raise error.Abort(_(b"partitions not defined"))

            return pm

        @property
        def replicationpartitions(self):
            s = set()
            for partitions, expr in self.replicationpartitionmap.values():
                s |= set(partitions)
            return s

        @contextlib.contextmanager
        def kafkainteraction(self):
            """Perform interactions with Kafka with error handling.

            All interactions with Kafka should occur inside this context
            manager. Kafka exceptions will be caught and handled specially.
            """
            try:
                yield
            except kafkacommon.KafkaError as e:
                self.producerlog(
                    "<unknown>",
                    "KAFKA_EXCEPTION",
                    "%s: %s" % (e, traceback.format_exc()),
                )
                raise

        def producerlog(self, repo, action, *args):
            """Write to the producer syslog facility."""
            ident = self.config(
                b"replicationproducer", b"syslogident", b"vcsreplicator"
            )
            facility = self.config(
                b"replicationproducer", b"syslogfacility", b"LOG_LOCAL2"
            )

            if not ident or not facility:
                raise error.Abort(
                    b"syslog identity or facility missing from "
                    b"replicationproducer config"
                )

            ident = pycompat.sysstr(ident)
            facility = pycompat.sysstr(facility)

            facility = getattr(syslog, facility)
            syslog.openlog(ident, 0, facility)

            if not isinstance(repo, (bytes, str)):
                repo = repo.replicationwireprotopath

            pre = "%s %s %s" % (os.environ.get("USER", "<unknown>"), repo, action)
            syslog.syslog(syslog.LOG_NOTICE, "%s %s" % (pre, " ".join(args)))
            syslog.closelog()

    ui.__class__ = replicatingui