in cc_net/jsonql.py [0:0]
def merge(lines, columns, separator="\t", newline=NEWLINE):
"""Reads tab separated columns and output a json using the given headers.
Headers are of form {key}[%{type}]
{type} can be one of {"f": float, "i": int, "b": bool, "s": string}.
Default type is string.
A special header "_" means interpret this column as json, and append all other
columns to it. Must appear only once and on last position.
Ex:
`echo '1\thello' | jsonql merge n t` --> `{"n": "1", "t": "hello"}`
`echo '1\thello" | jsonql merge n%i t` --> `{"n": 1, "t": "hello"}`
`echo '1\thello\t{"f": "bar"}' | jsonql merge n%i t _` --> `{"n": 1, "t": "hello", "f": "bar"}`
"""
handle_newlines = lambda s: s.replace(newline, "\n")
type_mapping: Dict[str, Callable] = {
"f": float,
"i": int,
"b": bool,
"s": handle_newlines,
}
type_parsing = [
type_mapping.get(f.split("%")[-1], handle_newlines) for f in columns
]
columns = [f.split("%")[0] for f in columns]
doc_index = columns.index("_") if "_" in columns else -1
read_json = JsonReader()
def parse(line):
parts = line.split(separator, len(columns) - 1)
doc: Dict[str, tp.Any] = {}
for i, value in enumerate(parts):
if columns[i] == "_":
doc.update(read_json(parts[doc_index]))
else:
try:
doc[columns[i]] = type_parsing[i](value)
except ValueError:
logging.error(
f"Error when parsing column {i} of line: {line[:100]}..."
)
return doc
for line in lines:
yield json.dumps(parse(line))