esrally/tracker/corpus.py (56 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import bz2
import json
import logging
import os
from esrally.utils import console
DOCS_COMPRESSOR = bz2.BZ2Compressor
COMP_EXT = ".bz2"
def template_vars(index_name, out_path, doc_count):
comp_outpath = out_path + COMP_EXT
return {
"index_name": index_name,
"filename": os.path.basename(comp_outpath),
"path": comp_outpath,
"doc_count": doc_count,
"uncompressed_bytes": os.path.getsize(out_path),
"compressed_bytes": os.path.getsize(comp_outpath),
}
def get_doc_outpath(outdir, name, suffix=""):
return os.path.join(outdir, f"{name}-documents{suffix}.json")
def extract(client, output_path, index, batch_size=1000):
"""
Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``.
:param client: Elasticsearch client used to extract data
:param output_path: Destination directory for corpus dump
:param index: Name of index to dump
:return: dict of properties describing the corpus for templates
"""
logger = logging.getLogger(__name__)
total_docs = client.count(index=index)["count"]
if total_docs > 0:
logger.info("[%d] total docs in index [%s].", total_docs, index)
docs_path = get_doc_outpath(output_path, index)
dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), batch_size, " for test mode")
dump_documents(client, index, docs_path, total_docs, batch_size)
return template_vars(index, docs_path, total_docs)
else:
logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index)
return None
def dump_documents(client, index, out_path, total_docs, batch_size=1000, progress_message_suffix=""):
# pylint: disable=import-outside-toplevel
from elasticsearch import helpers
logger = logging.getLogger(__name__)
freq = max(1, total_docs // batch_size)
progress = console.progress()
compressor = DOCS_COMPRESSOR()
comp_outpath = out_path + COMP_EXT
with open(out_path, "wb") as outfile:
with open(comp_outpath, "wb") as comp_outfile:
logger.info("Dumping corpus for index [%s] to [%s].", index, out_path)
query = {"query": {"match_all": {}}}
for n, doc in enumerate(helpers.scan(client, query=query, index=index, size=batch_size)):
if n >= total_docs:
break
data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8")
outfile.write(data)
comp_outfile.write(compressor.compress(data))
render_progress(progress, progress_message_suffix, index, n + 1, total_docs, freq)
comp_outfile.write(compressor.flush())
progress.finish()
def render_progress(progress, progress_message_suffix, index, cur, total, freq):
if cur % freq == 0 or total - cur < freq:
msg = f"Extracting documents for index [{index}]{progress_message_suffix}..."
percent = (cur * 100) / total
progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]")