generate_commands_from_synopsis.py (210 lines of code) (raw):

import argparse from functools import partial import pandas as pd from collections import Counter, defaultdict import numpy as np from tqdm.auto import tqdm import warnings import re import sys sys.path.append('../clai/utils') import bashlint from bashlint import data_tools def update_graph(cmd, graph): parsed = data_tools.bash_parser(cmd) child = parsed.children[0] if not isinstance(child, bashlint.nast.PipelineNode): return prev_name = "" for c in child.children: if c.is_utility(): cur_name = c.value else: cur_name = "" if prev_name and cur_name: graph[prev_name].add(cur_name) prev_name = cur_name def add_utilities(cmd, counter): def get_utilities_fun(node): utilities = [] if node.is_utility(): utilities.append(node.value) for child in node.children: utilities.extend(get_utilities_fun(child)) elif not node.is_argument(): for child in node.children: utilities.extend(get_utilities_fun(child)) return utilities parsed = data_tools.bash_parser(cmd) utils = get_utilities_fun(parsed) counter.update(utils) def number_of_required_arguments(cmd): return sum([not x.optional for x in bashlint.grammar.bg.grammar[cmd].positional_arguments if isinstance(x, bashlint.grammar.ArgumentState)]) def get_options(name, manpage, alias_to_idx): change_name = { 'grep': 'egrep', 'gcc': 'aarch64-linux-gnu-gcc-8', 'vim': 'rvim', 'rename': 'file-rename' } name = change_name.get(name, name) x = manpage[manpage['name'] == name] if len(x) < 1: x = alias_to_idx.get(name, None) if x is None or len(x) < 1: return None else: x = manpage.loc[x] options = [] paragraphs = x.iloc[0]['paragraphs'] for p in paragraphs: text = p['text'].strip() if not p['is_option']: super_option_regex = r"^(?:\<[^>]+\>)?(-{1,2}\w+)(?:\<\/[^>]+\>)?[\t ]*" match = re.match(super_option_regex, text) if not match: continue p['short'] = [] p['long'] = [] p['expectsarg'] = None p['argument'] = None while match: found_text = match.group(1) if found_text.startswith("--"): p['long'].append(found_text) else: p['short'].append(found_text) text = re.sub(super_option_regex, "", text, 1) match = re.match(super_option_regex, text) if 'short' not in p: continue option = { 'short': p['short'], 'long': p['long'], 'expectsarg': p['expectsarg'], 'argument': p['argument'] } if '\n' in text: text = text[text.find('\n') + 1:].strip() while text.startswith('<'): if '\n' in text: text = text[text.find('\n') + 1:].strip() else: text = re.sub("^\<[^>]+\>.*?\<\/[^>]+\>\s*", "", text).strip() option['text'] = text options.append(option) return options def generate_single_command(results, avg_options=3, idx=None): if idx is None: idx = np.random.randint(len(results)) row = results.iloc[idx] options = row['options'] cmd = [row['cmd']] text = [row['synopsis']] if options: p = np.random.rand(len(options)) if len(options) > avg_options: p = p < avg_options / len(options) else: p = p < 0.3 p_options = [x for x, pp in zip(options, p) if pp] for x in p_options: if '--help' in x['long'] or '--version' in x['long'] or '-help' in x['short']: if len(p_options) > 1: continue option_variants = x['short'] + x['long'] option_var = np.random.choice(option_variants) current_option = [option_var] if x['expectsarg']: current_option.append('ARG') add_text = " ".join(x['text'].lower().split()[:5]) text.append(add_text) cmd.append(tuple(current_option)) cmd[1:] = sorted(cmd[1:], key=lambda x: "2"+x[0][2:] if x[0].startswith('--') else "1"+x[0][1:]) real_cmd = [cmd[0]] for x in cmd[1:]: real_cmd.extend(x) cmd = real_cmd cmd.extend(["ARG"] * row['required']) return " ".join(cmd), " ".join(text) def get_cmd_name(cmd): space_idx = cmd.find(' ') if space_idx == -1: return cmd return cmd[:space_idx] def generate_commands(results, graph, avg_options=3, pipe_prob=0.3): idx = None cmds = [] texts = [] while True: cmd, text = generate_single_command(results, avg_options=avg_options, idx=idx) cmds.append(cmd) texts.append(text) p = np.random.rand() if p > pipe_prob: break cmd_name = get_cmd_name(cmd) next = graph.get(cmd_name, None) if next is None: break next_cmd = np.random.choice(list(graph[cmd_name])) idx = np.where(results['cmd'] == next_cmd)[0][0] cmd = " | ".join(cmds) text = " and ".join(texts) extra_cmds = ["{} | wc -l", "{} | grep ARG", "VAR=$({})", "VAR=`{}`"] extra_texts = [ ["How many {}", "Count lines of {}", "Get number of {}"], ["Find all {} with ARG", "Which {} has ARG"], ["Set variable VAR to the {}", "Set variable VAR to {}"], ["Set variable VAR to the {}", "Set variable VAR to {}"] ] if np.random.rand() < 0.01: cmd_idx = np.random.randint(len(extra_cmds)) cmd = extra_cmds[cmd_idx].format(cmd) text = np.random.choice(extra_texts[cmd_idx]).format(text) return cmd, text def main(args): nl2bash = pd.read_json(args.nl2bash).T graph = defaultdict(lambda: set()) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) tqdm.pandas(desc="Extracting utilities graph") nl2bash['cmd'].progress_apply(partial(update_graph, graph=graph)) count_utilities = Counter() with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) tqdm.pandas(desc="Extracting utilities from examples") nl2bash['cmd'].progress_apply(partial(add_utilities, counter=count_utilities)) all_commands = list(bashlint.grammar.bg.grammar.keys()) count_utilities.update(all_commands) commands = pd.DataFrame.from_dict(count_utilities, orient='index', columns=["count"]).reset_index() \ .rename(columns={'index': 'cmd'}).sort_values('count').reset_index(drop=True) commands['required'] = commands['cmd'].apply(number_of_required_arguments) print(f"Found {len(commands)} total utilities") manpage = pd.read_json(args.manpage, lines=True) commands = commands.merge(manpage[['name', 'synopsis']], left_on='cmd', right_on='name', how='left') commands.loc[commands['synopsis'].isna(), 'synopsis'] = '' alias_to_idx = defaultdict(lambda: []) def get_aliases(x): idx = x.name for y in x['aliases']: y = y[0] alias_to_idx[y].append(idx) manpage.apply(get_aliases, axis=1) commands.drop_duplicates(inplace=True) print(f"Now {len(commands)} utilities") commands['options'] = commands['cmd'].apply(partial(get_options, manpage=manpage, alias_to_idx=alias_to_idx)) del manpage results = [] for t in tqdm(range(args.size), desc="Generating examples"): results.append(list(generate_commands(commands, graph))) results = pd.DataFrame(results, columns=["cmd", "query"]) results.to_csv(args.output, index=False) if __name__ == '__main__': parser = argparse.ArgumentParser(description="Crawl examples from manpage-data.json") parser.add_argument("nl2bash", type=str, help="path to nl2bash-data.json") parser.add_argument("manpage", type=str, help="path to manpage-data.json") parser.add_argument("--size", type=int, default=10000, help="number of generated examples") parser.add_argument("-o", "--output", type=str, default="generated-examples.csv", help="path to output file") args = parser.parse_args() main(args)