def process_files()

in metron-platform/metron-data-management/src/main/scripts/Whois_CSV_to_JSON.py [0:0]


def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
    """
    Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
    Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
    directory for processing. Output filenames are generated using the first part of the directory name so a file
    named source_dir/com/1.csv would become outputDir/com_1.json

    :param source_dir: Source directory of CSV files
    :param output_dir: Output directory for resultant JSON files
    :param max_processes: Maximum number of processes run
    :return:
    """
    logging.info("Processing Whois files from %s" % source_dir)

    if output_dir and not os.path.exists(output_dir):
        logging.debug("Creating output directory %s" % output_dir)
        os.makedirs(output_dir)

    logging.info("Starting %s pool workers" % max_processes)

    if sys.version.startswith('2.6'):
        # no maxtaskperchild in 2.6
        pool = multiprocessing.Pool(processes=max_processes)
    else:
        pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)

    filecount = 0
    for dirname, dirnames, filenames in os.walk(source_dir):
        for filename in filenames:
            if filename.endswith('.csv'):
                # output files go to outputDir and are named using the last subdirectory from the dirname
                if output_dir:
                    out_filename = filename.replace('csv', 'json')
                    out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))

                    # if file does not exist or if overwrite is true, add file process to the pool
                    if not os.path.isfile(out_filename) or overwrite:
                        pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
                        filecount += 1
                    else:
                        logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
                else:
                    # no outputdir so we just analyze the files
                    pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
                    filecount += 1

    try:
        pool.close()
        logging.info("Starting activities on %s CSV files" % filecount)
        pool.join()
    except KeyboardInterrupt:
        logging.info("Aborting")
        pool.terminate()

    logging.info("Completed")