in metron-platform/metron-data-management/src/main/scripts/Whois_CSV_to_JSON.py [0:0]
def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
"""
Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
directory for processing. Output filenames are generated using the first part of the directory name so a file
named source_dir/com/1.csv would become outputDir/com_1.json
:param source_dir: Source directory of CSV files
:param output_dir: Output directory for resultant JSON files
:param max_processes: Maximum number of processes run
:return:
"""
logging.info("Processing Whois files from %s" % source_dir)
if output_dir and not os.path.exists(output_dir):
logging.debug("Creating output directory %s" % output_dir)
os.makedirs(output_dir)
logging.info("Starting %s pool workers" % max_processes)
if sys.version.startswith('2.6'):
# no maxtaskperchild in 2.6
pool = multiprocessing.Pool(processes=max_processes)
else:
pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)
filecount = 0
for dirname, dirnames, filenames in os.walk(source_dir):
for filename in filenames:
if filename.endswith('.csv'):
# output files go to outputDir and are named using the last subdirectory from the dirname
if output_dir:
out_filename = filename.replace('csv', 'json')
out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))
# if file does not exist or if overwrite is true, add file process to the pool
if not os.path.isfile(out_filename) or overwrite:
pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
filecount += 1
else:
logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
else:
# no outputdir so we just analyze the files
pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
filecount += 1
try:
pool.close()
logging.info("Starting activities on %s CSV files" % filecount)
pool.join()
except KeyboardInterrupt:
logging.info("Aborting")
pool.terminate()
logging.info("Completed")