in metron-sensors/pycapa/pycapa/consumer.py [0:0]
def consumer(args, poll_timeout=3.0):
""" Consumes packets from a Kafka topic. """
# setup the signal handler
signal.signal(signal.SIGINT, signal_handler)
# where to start consuming messages from
kafka_offset_options = {
"begin": seek_to_begin,
"end": seek_to_end,
"stored": seek_to_stored
}
on_assign_cb = kafka_offset_options[args.kafka_offset]
# connect to kafka
logging.debug("Connecting to Kafka; %s", args.kafka_configs)
kafka_consumer = Consumer(args.kafka_configs)
kafka_consumer.subscribe([args.kafka_topic], on_assign=on_assign_cb)
# if 'pretty-print' not set, write libpcap global header
if args.pretty_print == 0:
sys.stdout.write(global_header(args))
sys.stdout.flush()
try:
pkts_in = 0
while not finished.is_set() and (args.max_packets <= 0 or pkts_in < args.max_packets):
# consume a message from kafka
msg = kafka_consumer.poll(timeout=poll_timeout)
if msg is None:
# no message received
continue;
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
if args.pretty_print > 0:
print "Reached end of topar: topic=%s, partition=%d, offset=%s" % (
msg.topic(), msg.partition(), msg.offset())
else:
raise KafkaException(msg.error())
else:
pkts_in += 1
logging.debug("Packet received: pkts_in=%d", pkts_in)
if args.pretty_print == 0:
# write the packet header and packet
sys.stdout.write(packet_header(msg))
sys.stdout.write(msg.value())
sys.stdout.flush()
elif pkts_in % args.pretty_print == 0:
# pretty print
print 'Packet[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % (
pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic,
msg.partition(), msg.offset(), len(msg.value()))
finally:
sys.stdout.close()
kafka_consumer.close()