in pylib/vcsreplicator/vcsreplicator/consumer.py [0:0]
def run_cli(message_handler):
"""Command line interface to consumer.
``message_handler`` is the message processing callable to be used when
messages are acted upon.
"""
import argparse
import yaml
# Unbuffer stdout.
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 1)
parser = argparse.ArgumentParser()
parser.add_argument("config", help="Path to config file to load")
parser.add_argument(
"--dump", action="store_true", help="Dump available messages and exit"
)
parser.add_argument(
"--onetime", action="store_true", help="Process a single message and exit"
)
parser.add_argument(
"--start-from", type=int, help="Start N records from the beginning"
)
parser.add_argument(
"--partition",
type=int,
help="Partition to fetch from. Defaults to all partitions.",
)
parser.add_argument(
"--skip",
action="store_true",
help="Skip the consuming of the next message then exit",
)
parser.add_argument(
"--wait-for-no-lag",
action="store_true",
help="Wait for consumer lag to be 0 messages and exit",
)
parser.add_argument(
"--wait-for-n",
type=int,
help="Wait for N messages to become available then exit",
)
args = parser.parse_args()
config = Config(filename=args.config)
# hglib will use 'hg' which relies on PATH being correct. Since we're
# running from a virtualenv, PATH may not be set unless the virtualenv
# is activated. Overwrite the hglib defaults with a value from the config.
hglib.HGPATH = config.hg_path
client = config.get_client_from_section("consumer", timeout=30)
topic = config.get("consumer", "topic")
group = config.get("consumer", "group")
poll_timeout = config.c.getfloat("consumer", "poll_timeout")
wait_for_topic(client, topic, 30)
if args.wait_for_no_lag:
while True:
d = consumer_offsets_and_lag(client, topic, [group])
partitions = d[group]
lagging = False
for partition, (offset, available, lag_time) in partitions.items():
lag = available - offset
if lag > 0:
lagging = True
if lagging:
time.sleep(0.1)
else:
sys.exit(0)
partitions = None
if args.partition is not None:
partitions = [args.partition]
consumer = Consumer(client, group, topic, partitions)
if args.start_from:
consumer.seek(args.start_from, 0)
if args.wait_for_n:
left = args.wait_for_n
while left > 0:
m = consumer.get_message()
if not m:
continue
print("got a %s message" % payload_log_display(m[2]))
left -= 1
sys.exit(0)
if args.dump:
messages = []
while True:
m = consumer.get_message()
if not m:
break
messages.append(m[2])
print(yaml.safe_dump(messages, default_flow_style=False).rstrip())
sys.exit(0)
if args.skip:
r = consumer.get_message()
if not r:
print("no message available; nothing to skip")
sys.exit(1)
partition, message, payload = r
consumer.commit(partitions=[partition])
print("skipped message in partition %d for group %s" % (partition, group))
sys.exit(0)
root = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(name)s %(message)s")
formatter.converter = time.gmtime
handler.setFormatter(formatter)
root.addHandler(handler)
if not args.onetime:
logger.warn(
"starting consumer for topic=%s group=%s partitions=%s"
% (topic, group, partitions or "all")
)
try:
consume(
config,
consumer,
message_handler,
onetime=args.onetime,
timeout=poll_timeout,
)
if not args.onetime:
logger.warn("process exiting gracefully")
except BaseException:
logger.error("exiting main consume loop with error")
raise