ncrx/netcons-gen.py (105 lines of code) (raw):
#!/usr/bin/env python3
#
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the LICENSE file in
# the root directory of this source tree.
#
"""
This tool produces netcons messages for testing (mostly of {lib,}ncrx).
Usual usage:
1. Run `ncrx [port]` listening in one shell
2. In another shell, run `netcons-gen [...] | nc -u 127.0.0.1 [port]`
"""
import argparse
import random
import sys
import time
from enum import Enum
class Level(Enum):
LOG_EMERG = 0
LOG_ALERT = 1
LOG_CRIT = 2
LOG_ERR = 3
LOG_WARNING = 4
LOG_NOTICE = 5
LOG_INFO = 6
LOG_DEBUG = 7
class Facility(Enum):
LOG_KERN = 0
LOG_USER = 1
LOG_MAIL = 2
LOG_DAEMON = 3
LOG_AUTH = 4
LOG_SYSLOG = 5
LOG_LPR = 6
LOG_NEWS = 7
LOG_UUCP = 8
LOG_CRON = 9
LOG_AUTHPRIV = 10
LOG_LOCAL0 = 16
LOG_LOCAL1 = 17
LOG_LOCAL2 = 18
LOG_LOCAL3 = 19
LOG_LOCAL4 = 20
LOG_LOCAL5 = 21
LOG_LOCAL6 = 22
LOG_LOCAL7 = 23
class Mode(Enum):
NORMAL = 0
SKIP = 1
RESET = 2
ARG_TO_MODE_MAP = {"reset": Mode.RESET, "skip": Mode.SKIP}
def make_dictionary_string(msg):
"""Format X=Y\0X=Y, no trailing \0"""
return "\0".join("{}={}".format(k, v) for k, v in msg.items())
def make_ext_header(seq, facility, level, cont):
"""
See printk.c's msg_print_ext_header for format spec.
"""
faclev = (facility.value << 3) | level.value
ts_usec = int(time.monotonic() * (10 ** 6))
return "{},{},{},{};".format(faclev, seq, ts_usec, "c" if cont else "-")
def _body_escape(text):
return text.replace("\0", "\n")
def make_ext_body(text, dict_str):
"""
See printk.c's msg_print_ext_body for format spec.
Escaping of unprintables is currently unimplemented.
"""
return "{}\n{}".format(_body_escape(text), _body_escape(dict_str))
def make_netcons_msg(
seq=0,
facility=Facility.LOG_KERN,
level=Level.LOG_ERR,
cont=False,
text="text",
meta_dict=None,
):
if meta_dict is None:
meta_dict = {"DICT": "test"}
dict_str = make_dictionary_string(meta_dict)
header = make_ext_header(seq=seq, facility=facility, level=level, cont=cont)
body = make_ext_body(text=text, dict_str=dict_str)
return "{}{}".format(header, body)
def parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--skip", action="store_true", help="Randomly skip sequence numbers"
)
parser.add_argument(
"--reset", action="store_true", help="Randomly reset the sequence to 0 again"
)
parser.add_argument(
"--cont", action="store_true", help="Randomly insert LOG_CONT messages"
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
enabled_modes = [Mode.NORMAL]
for arg_name, mode in ARG_TO_MODE_MAP.items():
if getattr(args, arg_name):
enabled_modes.append(mode)
seq = 0
cont = False
while True:
print(
make_netcons_msg(
seq=seq, text="hi", meta_dict={"UNAME": "it's minix i swear"}, cont=cont
),
flush=True,
)
chosen_mode = random.choice(enabled_modes)
if chosen_mode == Mode.NORMAL:
new_seq = seq + 1
elif chosen_mode == Mode.SKIP:
new_seq = seq + random.randint(1, 5)
elif chosen_mode == Mode.RESET:
new_seq = 0
if args.cont:
cont = random.choice([True, False])
print(
"seq: {} -> {}, mode: {}, cont: {}".format(seq, new_seq, chosen_mode, cont),
file=sys.stderr,
)
seq = new_seq
time.sleep(0.5)