in identity-resolution/notebooks/identity-graph/nepytune/cli/transform.py [0:0]
def main(args):
"""Transform csv files into ready-to-load neptune format."""
config = configparser.ConfigParser()
config.read(args.config_file.name)
files = {
"facts": config["src"]["facts"],
"urls": config["src"]["urls"],
"titles": config["src"]["titles"],
}
if args.websites:
logger.info("Generating website nodes to %s", config["dst"]["websites"])
websites.generate_website_nodes(
files["urls"], files["titles"], config["dst"]["websites"]
)
if args.website_groups:
groups_json = config["src"]["website_groups"]
nodes_dst = config["dst"]["website_group_nodes"]
logger.info("Generating website group nodes to %s", nodes_dst)
websites.generate_website_group_nodes(groups_json, nodes_dst)
edges_dst = config["dst"]["website_group_edges"]
logger.info("Generating website group edges to %s", edges_dst)
website_groups.generate_website_group_edges(groups_json, edges_dst)
if args.transientIds:
if args.workers > 1:
fact_files = sorted(glob.glob(config["src"]["facts_glob"]))
url_files = sorted(glob.glob(config["src"]["urls_glob"]))
with concurrent.futures.ProcessPoolExecutor(
max_workers=args.workers
) as executor:
futures = []
logger.info("Scheduling...")
for fact_file, url_file in zip(fact_files, url_files):
futures.append(
executor.submit(
users.generate_user_nodes,
fact_file,
build_destination_path(
fact_file, config["dst"]["transient_nodes"]
),
)
)
futures.append(
executor.submit(
user_website.generate_user_website_edges,
{
"titles": files["titles"],
"urls": url_file,
"facts": fact_file,
},
build_destination_path(
fact_file, config["dst"]["transient_edges"]
),
)
)
logger.info("Processing of transient nodes started.")
for future in concurrent.futures.as_completed(futures):
logger.info(
"Succesfully written transient entity file into %s",
future.result(),
)
else:
nodes_dst = Template(config["dst"]["transient_nodes"]).substitute(
batch_id=""
)
logger.info("Generating transient id nodes to %s", nodes_dst)
users.generate_user_nodes(config["src"]["facts"], nodes_dst)
edges_dst = Template(config["dst"]["transient_edges"]).substitute(
batch_id=""
)
logger.info("Generating transient id edges to %s", edges_dst)
user_website.generate_user_website_edges(files, edges_dst)
if args.persistentIds:
logger.info(
"Generating persistent id nodes to %s", config["dst"]["persistent_nodes"]
)
users.generate_persistent_nodes(
config["src"]["persistent"], config["dst"]["persistent_nodes"]
)
logger.info(
"Generating persistent id edges to %s", config["dst"]["persistent_edges"]
)
persistent_ids.generate_persistent_id_edges(
config["src"]["persistent"], config["dst"]["persistent_edges"]
)
if args.identityGroupIds:
logger.info(
"Generating identity group id nodes to %s",
config["dst"]["identity_group_nodes"],
)
identity_groups.generate_identity_group_nodes(
config["src"]["identity_group"], config["dst"]["identity_group_nodes"]
)
logger.info(
"Generating identity group id edges to %s",
config["dst"]["identity_group_edges"],
)
identity_group_edges.generate_identity_group_edges(
config["src"]["identity_group"], config["dst"]["identity_group_edges"]
)
if args.ips:
logger.info("Generating IP id nodes to %s", config["dst"]["ip_nodes"])
ip_loc.generate_ip_loc_nodes_from_facts(
config["src"]["facts"], config["dst"]["ip_nodes"]
)
logger.info("Generating IP edges to %s", config["dst"]["ip_edges"])
ip_loc_edges.generate_ip_loc_edges_from_facts(
config["src"]["facts"], config["dst"]["ip_edges"]
)
logger.info("Done!")