def consume_one()

in pylib/vcsreplicator/vcsreplicator/pushnotifications.py [0:0]


def consume_one(config, consumer, cb, timeout=0.1, alive=None, cbkwargs=None):
    """Consume at most a single message and notify the callback if necessary.

    The callback will receive arguments describing the push along with any
    other arguments from ``cbkwargs`` that are specified.
    """
    r = consumer.get_message(timeout=timeout)
    if not r:
        return

    partition, message, payload = r
    name = payload["name"]

    if name == "heartbeat-1":
        logger.warn("%s message not relevant; ignoring" % name)
        consumer.commit(partitions=[partition])
        return

    # All other messages should be associated with a repo and have a "path"
    # key.
    path = payload["path"]
    public_url = config.get_public_url_from_wire_path(path)

    if not public_url:
        logger.warn(
            "no public URL could be resolved for %s; not sending notification" % path
        )
        consumer.commit(partitions=[partition])
        return

    if config.c.has_section("ignore_paths"):
        for prefix, _ in config.c.items("ignore_paths"):
            if path.startswith(prefix):
                logger.warn("ignoring repo because path in ignore list: %s" % path)
                consumer.commit(partitions=[partition])
                return

    local_path = config.parse_wire_repo_path(path)

    # FUTURE if we ever write a "repo deleted" message, this should be updated to
    # send the message through.
    if not os.path.exists(local_path):
        logger.warn("repository %s does not exist; ignoring notification" % local_path)
        consumer.commit(partitions=[partition])
        return

    cbargs = dict(cbkwargs or {})
    firecb = True

    if name in ("hg-changegroup-1", "hg-changegroup-2"):
        message_type = "changegroup.1"
        cbargs["data"] = _get_changegroup_payload(
            local_path, public_url, payload["heads"], payload["source"]
        )
    elif name in ("hg-repo-init-1", "hg-repo-init-2"):
        message_type = "newrepo.1"
        cbargs["data"] = {
            "repo_url": public_url,
        }
    elif name == "hg-pushkey-1":
        res = _get_pushkey_payload(
            local_path,
            public_url,
            payload["namespace"],
            payload["key"],
            payload["old"],
            payload["new"],
            payload["ret"],
        )
        if res:
            message_type, cbargs["data"] = res
        else:
            firecb = False

    else:
        # Ack unsupported messages.
        logger.warn("%s message not relevant to push notifier; ignoring" % name)
        firecb = False

    if firecb:
        cb(
            message_type=message_type,
            partition=partition,
            message=message,
            created=payload["_original_created"],
            **cbargs
        )

    consumer.commit(partitions=[partition])