in pylib/vcsreplicator/vcsreplicator/pushnotifications.py [0:0]
def consume_one(config, consumer, cb, timeout=0.1, alive=None, cbkwargs=None):
"""Consume at most a single message and notify the callback if necessary.
The callback will receive arguments describing the push along with any
other arguments from ``cbkwargs`` that are specified.
"""
r = consumer.get_message(timeout=timeout)
if not r:
return
partition, message, payload = r
name = payload["name"]
if name == "heartbeat-1":
logger.warn("%s message not relevant; ignoring" % name)
consumer.commit(partitions=[partition])
return
# All other messages should be associated with a repo and have a "path"
# key.
path = payload["path"]
public_url = config.get_public_url_from_wire_path(path)
if not public_url:
logger.warn(
"no public URL could be resolved for %s; not sending notification" % path
)
consumer.commit(partitions=[partition])
return
if config.c.has_section("ignore_paths"):
for prefix, _ in config.c.items("ignore_paths"):
if path.startswith(prefix):
logger.warn("ignoring repo because path in ignore list: %s" % path)
consumer.commit(partitions=[partition])
return
local_path = config.parse_wire_repo_path(path)
# FUTURE if we ever write a "repo deleted" message, this should be updated to
# send the message through.
if not os.path.exists(local_path):
logger.warn("repository %s does not exist; ignoring notification" % local_path)
consumer.commit(partitions=[partition])
return
cbargs = dict(cbkwargs or {})
firecb = True
if name in ("hg-changegroup-1", "hg-changegroup-2"):
message_type = "changegroup.1"
cbargs["data"] = _get_changegroup_payload(
local_path, public_url, payload["heads"], payload["source"]
)
elif name in ("hg-repo-init-1", "hg-repo-init-2"):
message_type = "newrepo.1"
cbargs["data"] = {
"repo_url": public_url,
}
elif name == "hg-pushkey-1":
res = _get_pushkey_payload(
local_path,
public_url,
payload["namespace"],
payload["key"],
payload["old"],
payload["new"],
payload["ret"],
)
if res:
message_type, cbargs["data"] = res
else:
firecb = False
else:
# Ack unsupported messages.
logger.warn("%s message not relevant to push notifier; ignoring" % name)
firecb = False
if firecb:
cb(
message_type=message_type,
partition=partition,
message=message,
created=payload["_original_created"],
**cbargs
)
consumer.commit(partitions=[partition])