def main()

in identity-resolution/notebooks/identity-graph/nepytune/cli/transform.py [0:0]


def main(args):
    """Transform csv files into ready-to-load neptune format."""
    config = configparser.ConfigParser()
    config.read(args.config_file.name)

    files = {
        "facts": config["src"]["facts"],
        "urls": config["src"]["urls"],
        "titles": config["src"]["titles"],
    }

    if args.websites:
        logger.info("Generating website nodes to %s", config["dst"]["websites"])
        websites.generate_website_nodes(
            files["urls"], files["titles"], config["dst"]["websites"]
        )

    if args.website_groups:
        groups_json = config["src"]["website_groups"]

        nodes_dst = config["dst"]["website_group_nodes"]
        logger.info("Generating website group nodes to %s", nodes_dst)
        websites.generate_website_group_nodes(groups_json, nodes_dst)

        edges_dst = config["dst"]["website_group_edges"]
        logger.info("Generating website group edges to %s", edges_dst)
        website_groups.generate_website_group_edges(groups_json, edges_dst)

    if args.transientIds:
        if args.workers > 1:
            fact_files = sorted(glob.glob(config["src"]["facts_glob"]))
            url_files = sorted(glob.glob(config["src"]["urls_glob"]))

            with concurrent.futures.ProcessPoolExecutor(
                max_workers=args.workers
            ) as executor:
                futures = []
                logger.info("Scheduling...")
                for fact_file, url_file in zip(fact_files, url_files):
                    futures.append(
                        executor.submit(
                            users.generate_user_nodes,
                            fact_file,
                            build_destination_path(
                                fact_file, config["dst"]["transient_nodes"]
                            ),
                        )
                    )
                    futures.append(
                        executor.submit(
                            user_website.generate_user_website_edges,
                            {
                                "titles": files["titles"],
                                "urls": url_file,
                                "facts": fact_file,
                            },
                            build_destination_path(
                                fact_file, config["dst"]["transient_edges"]
                            ),
                        )
                    )
                logger.info("Processing of transient nodes started.")

                for future in concurrent.futures.as_completed(futures):
                    logger.info(
                        "Succesfully written transient entity file into %s",
                        future.result(),
                    )
        else:
            nodes_dst = Template(config["dst"]["transient_nodes"]).substitute(
                batch_id=""
            )
            logger.info("Generating transient id nodes to %s", nodes_dst)
            users.generate_user_nodes(config["src"]["facts"], nodes_dst)

            edges_dst = Template(config["dst"]["transient_edges"]).substitute(
                batch_id=""
            )
            logger.info("Generating transient id edges to %s", edges_dst)
            user_website.generate_user_website_edges(files, edges_dst)

    if args.persistentIds:
        logger.info(
            "Generating persistent id nodes to %s", config["dst"]["persistent_nodes"]
        )
        users.generate_persistent_nodes(
            config["src"]["persistent"], config["dst"]["persistent_nodes"]
        )
        logger.info(
            "Generating persistent id edges to %s", config["dst"]["persistent_edges"]
        )
        persistent_ids.generate_persistent_id_edges(
            config["src"]["persistent"], config["dst"]["persistent_edges"]
        )

    if args.identityGroupIds:
        logger.info(
            "Generating identity group id nodes to %s",
            config["dst"]["identity_group_nodes"],
        )
        identity_groups.generate_identity_group_nodes(
            config["src"]["identity_group"], config["dst"]["identity_group_nodes"]
        )
        logger.info(
            "Generating identity group id edges to %s",
            config["dst"]["identity_group_edges"],
        )
        identity_group_edges.generate_identity_group_edges(
            config["src"]["identity_group"], config["dst"]["identity_group_edges"]
        )

    if args.ips:
        logger.info("Generating IP id nodes to %s", config["dst"]["ip_nodes"])
        ip_loc.generate_ip_loc_nodes_from_facts(
            config["src"]["facts"], config["dst"]["ip_nodes"]
        )
        logger.info("Generating IP edges to %s", config["dst"]["ip_edges"])
        ip_loc_edges.generate_ip_loc_edges_from_facts(
            config["src"]["facts"], config["dst"]["ip_edges"]
        )

    logger.info("Done!")