in spark_scripts/write_sentence_level_es_index.py [0:0]
def main(sc):
argp = argparse.ArgumentParser()
argp.add_argument('--corpus', help='input path, e.g. foobar/rollup/', required=True)
argp.add_argument('--output-dir', default="output/", help='')
argp.add_argument('--es-hosts', help='', default=os.getenv('AES_HOSTS'))
argp.add_argument('--es-index', help='', required=True)
argp.add_argument('--num-partitions', type=int, default=1000, help='')
argp.add_argument('--debug-save', help='for debugging purposes', action='store_true')
args = argp.parse_args()
corpus_rdd = sc.textFile(args.corpus, minPartitions=args.num_partitions)
if not os.path.exists(args.output_dir):
os.mkdir(args.output_dir)
with ExitStack() as stack:
metric_filename = os.path.join(args.output_dir, 'metric.txt')
metric_fptr = stack.enter_context(open(metric_filename, 'w'))
print('args: {}'.format(args), file=metric_fptr)
job = ESIndexWriter(
args.output_dir,
es_hosts=args.es_hosts,
es_index_name=args.es_index,
debug_save=args.debug_save)
job.run_job(corpus_rdd, metric_fptr)
logging.info('Output directory: {}'.format(args.output_dir))