main.py (123 lines of code) (raw):
from utils.es import *
import argparse
program_defaults = {
# Variables to configure the ES client:
"elasticsearch_host": "https://localhost:9200",
"elasticsearch_ca_path": "/home/c/.elastic-package/profiles/default/certs/elasticsearch/ca-cert.pem",
"elasticsearch_user": "elastic",
"elasticsearch_pwd": "changeme",
# If you are running on cloud, you should set these two. If they are not empty, then the client will connect
# to the cloud using these variables, instead of the ones above.
"cloud_pwd": "",
"cloud_id": "",
# Name of the data stream to test
"data_stream": "metrics-aws.usage-default",
# docs_index: number of the index to use to retrieve the documents. -1 indicates the default will be used,
# which would be 0 - indicating the first index of the data stream
# settings_mappings_index: number of the index to use to get the mappings and settings for the TSDB index.
# -1 indicates the default will be used, which is the last index of the data stream.
# Note: We use -1 because we do not know how many indices a data stream has at this moment. Even though
# we could set the default of docs_index to 0 because it is guaranteed that the default (first index) exists,
# we use -1 to keep consistency.
# Important: count starts at 0.
# Example: an index 000001, has the index 0; an index 000002, has the index 1, and so on.
"docs_index": -1,
"settings_mappings_index": -1,
# Maximum documents to be reindexed to the new TSDB index. -1 indicates that we should reindex all documents.
# Tip: Is reindexing too slow or encountering a timeout? Set this value.
"max_docs": -1
}
program_defaults |= {
# Name of the directory to place files.
"directory_overlapping_files": "overwritten-docs" + "-" + program_defaults["data_stream"],
# Do you want to get in your @directory_overlapping_files the files that are overlapping?
# Set this to True and delete the directory named directory_overlapping_files if it already exists!
"get_overlapping_files": True,
# How many sets of dimensions do you want to print that are causing loss of data?
# This value also indicates how many directories will be created in case get_overlapping_files is set to True.
"display_docs": 10,
# How many documents you want to retrieve per set of dimensions causing a loss of data?
"copy_docs_per_dimension": 2
}
def get_cmd_arguments():
parser = argparse.ArgumentParser(description='Process command line arguments.',
formatter_class=argparse.RawTextHelpFormatter)
# ES variables
parser.add_argument('--elasticsearch_host', action="store", dest='elasticsearch_host',
default=program_defaults["elasticsearch_host"],
help="Elasticsearch host.\nDefault: " + program_defaults["elasticsearch_host"])
parser.add_argument('--elasticsearch_ca_path', action="store", dest='elasticsearch_ca_path',
default=program_defaults["elasticsearch_ca_path"],
help="Location of the Elasticsearch certificate.\nDefault: "
+ program_defaults["elasticsearch_ca_path"])
parser.add_argument('--elasticsearch_user', action="store", dest='elasticsearch_user',
default=program_defaults["elasticsearch_user"],
help="Name of the Elasticsearch user.\nDefault: " + program_defaults["elasticsearch_user"])
parser.add_argument('--elasticsearch_pwd', action="store", dest='elasticsearch_pwd',
default=program_defaults["elasticsearch_pwd"],
help="Elasticsearch password.\nDefault: " + program_defaults["elasticsearch_pwd"])
# Cloud variables
parser.add_argument('--cloud_id', action="store", dest='cloud_id', default=program_defaults["cloud_id"],
help="The ID for Elastic Cloud. If set, it will overwrite every elasticsearch_* argument."
"\nDefault: " + program_defaults["cloud_id"])
parser.add_argument('--cloud_pwd', action="store", dest='cloud_pwd', default=program_defaults["cloud_pwd"],
help="The password for Elastic Cloud. If set, it will overwrite every elasticsearch_* argument."
"\nDefault: " + program_defaults["cloud_pwd"])
# Data stream name
parser.add_argument('--data_stream', action="store", dest='data_stream', default=program_defaults["data_stream"],
help="The name of the data stream to migrate to TSDB.\nDefault: "
+ program_defaults["data_stream"])
# Reindex variables
if program_defaults["docs_index"] == -1:
default = "First index of the data stream"
else:
default = str(program_defaults["docs_index"])
parser.add_argument('--docs_index', action="store", dest='docs_index', default=program_defaults["docs_index"],
help="The data stream index number to be used to retrieve the documents. "
"Count starts at 0. This means the index 0001 has docs_index 0, index 0002 has"
"the docs_index 1 and so on.\nDefault: " + default)
if program_defaults["settings_mappings_index"] == -1:
default = "Last index of the data stream"
else:
default = str(program_defaults["settings_mappings_index"])
parser.add_argument('--settings_mappings_index', action="store", dest='settings_mappings_index',
default=program_defaults["settings_mappings_index"],
help="The data stream index number to be used to retrieve the mappings and settings. "
"Count starts at 0. This means the index 0001 has settings_mappings_index 0, index 0002 "
"has the settings_mappings_index 1 and so on.\nDefault: " + default)
if program_defaults["max_docs"] == -1:
default = "All documents from the data stream index docs_index."
else:
default = str(program_defaults["max_docs"])
parser.add_argument('--max_docs', action="store", dest='max_docs', default=program_defaults["max_docs"],
help="The number of documents to retrieve from the index and reindex to the TSDB one."
"\nDefault: " + default)
# Overlapping files configuration
parser.add_argument('--get_overlapping_files', action="store", dest='get_overlapping_files',
default=program_defaults["get_overlapping_files"],
help="Flag to place the overwritten documents: documents will be placed if True, otherwise"
" if False.\nDefault: " + str(program_defaults["get_overlapping_files"]))
parser.add_argument('--directory_overlapping_files', action="store", dest='directory_overlapping_files',
default=program_defaults["directory_overlapping_files"],
help="The directory path to place the overwritten documents.\nDefault: "
+ program_defaults["directory_overlapping_files"])
parser.add_argument('--display_docs', action="store", dest='display_docs',
default=program_defaults["display_docs"],
help="Number of documents overlapping used to display the dimensions."
"\nDefault: " + str(program_defaults["display_docs"]))
parser.add_argument('--copy_docs_per_dimension', action="store", dest='copy_docs_per_dimension',
default=program_defaults["copy_docs_per_dimension"],
help="Number of documents to retrieve per set of dimensions that caused loss of data."
"\nDefault: " + str(program_defaults["copy_docs_per_dimension"]))
args, unknown = parser.parse_known_args()
if len(unknown) > 0:
parser.print_help()
print("\nUser provided unknown flags:", unknown)
print("Program will end.")
exit(0)
return args
if __name__ == '__main__':
args = get_cmd_arguments()
print("Values being used:")
for arg in vars(args):
if arg.startswith("elasticsearch_"):
if args.cloud_id != "" and args.cloud_pwd != "":
continue
if arg.startswith("cloud_"):
if args.cloud_id == "" or args.cloud_pwd == "":
continue
if arg == "docs_index" and getattr(args, arg) == -1:
print("\t{} = {}".format(arg, "First index of the data stream"))
continue
if arg == "settings_mappings_index" and getattr(args, arg) == -1:
print("\t{} = {}".format(arg, "Last index of the data stream"))
continue
if arg == "max_docs" and getattr(args, arg) == -1:
print("\t{} = {}".format(arg, "All documents"))
continue
print("\t{} = {}".format(arg, getattr(args, arg)))
print()
# Create the client instance
client = get_client(args.elasticsearch_host, args.elasticsearch_ca_path, args.elasticsearch_user,
args.elasticsearch_pwd, args.cloud_id, args.cloud_pwd)
print("You're testing with version {}.\n".format(client.info()["version"]["number"]))
# Create TSDB index and place documents
all_placed = copy_from_data_stream(client, args.data_stream, int(args.docs_index), int(args.settings_mappings_index),
int(args.max_docs))
# Get overwritten documents information
if not all_placed:
print("Overwritten documents will be placed in new index.")
create_index_missing_for_docs(client)
get_missing_docs_info(client, args.data_stream, int(args.display_docs), args.directory_overlapping_files,
bool(args.get_overlapping_files), int(args.copy_docs_per_dimension))