def merge()

in cc_net/jsonql.py [0:0]


def merge(lines, columns, separator="\t", newline=NEWLINE):
    """Reads tab separated columns and output a json using the given headers.

    Headers are of form {key}[%{type}]
    {type} can be one of {"f": float, "i": int, "b": bool, "s": string}.
    Default type is string.
    A special header "_" means interpret this column as json, and append all other
    columns to it. Must appear only once and on last position.

    Ex:
    `echo '1\thello' | jsonql merge n t` --> `{"n": "1", "t": "hello"}`
    `echo '1\thello" | jsonql merge n%i t` --> `{"n": 1, "t": "hello"}`
    `echo '1\thello\t{"f": "bar"}' | jsonql merge n%i t _` --> `{"n": 1, "t": "hello", "f": "bar"}`
    """
    handle_newlines = lambda s: s.replace(newline, "\n")
    type_mapping: Dict[str, Callable] = {
        "f": float,
        "i": int,
        "b": bool,
        "s": handle_newlines,
    }
    type_parsing = [
        type_mapping.get(f.split("%")[-1], handle_newlines) for f in columns
    ]
    columns = [f.split("%")[0] for f in columns]
    doc_index = columns.index("_") if "_" in columns else -1
    read_json = JsonReader()

    def parse(line):
        parts = line.split(separator, len(columns) - 1)
        doc: Dict[str, tp.Any] = {}
        for i, value in enumerate(parts):
            if columns[i] == "_":
                doc.update(read_json(parts[doc_index]))
            else:
                try:
                    doc[columns[i]] = type_parsing[i](value)
                except ValueError:
                    logging.error(
                        f"Error when parsing column {i} of line: {line[:100]}..."
                    )
        return doc

    for line in lines:
        yield json.dumps(parse(line))