src/graph_notebook/notebooks/03-Neptune-ML/02-SPARQL/neptune_ml_sparql_utils.py [162:282]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    except Exception as e:
        logging.error(e)


def get_neptune_ml_job_output_location(job_name: str, job_type: str):
    assert job_type in ["dataprocessing", "modeltraining", "modeltransform"], "Invalid neptune ml job type"

    host, port, use_iam = load_configuration()

    response = signed_request("GET", service='neptune-db',
                              url=f'https://{host}:{port}/ml/{job_type}/{job_name}',
                              headers={'content-type': 'application/json'})
    result = json.loads(response.content.decode('utf-8'))
    if result["status"] != "Completed":
        logging.error("Neptune ML {} job: {} is not completed".format(job_type, job_name))
        return
    return result["processingJob"]["outputLocation"]


def get_dataprocessing_job_output_location(dataprocessing_job_name: str):
    assert dataprocessing_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(dataprocessing_job_name, "dataprocessing")


def get_modeltraining_job_output_location(training_job_name: str):
    assert training_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(training_job_name, "modeltraining")


def get_node_to_idx_mapping(training_job_name: str = None, dataprocessing_job_name: str = None,
                            model_artifacts_location: str = './model-artifacts', vertex_label: str = None):
    assert training_job_name is not None or dataprocessing_job_name is not None, \
        "You must provide either a modeltraining job id or a dataprocessing job id to obtain node to index mappings"

    job_name = training_job_name if training_job_name is not None else dataprocessing_job_name
    job_type = "modeltraining" if training_job_name == job_name else "dataprocessing"
    filename = "mapping.info" if training_job_name == job_name else "info.pkl"
    mapping_key = "node2id" if training_job_name == job_name else "node_id_map"

    # get mappings
    model_artifacts_location = os.path.join(model_artifacts_location, job_name)
    if not os.path.exists(os.path.join(model_artifacts_location, filename)):
        job_s3_output = get_neptune_ml_job_output_location(job_name, job_type)
        print(job_s3_output)
        if not job_s3_output:
            return
        S3Downloader.download(os.path.join(job_s3_output, filename), model_artifacts_location)

    with open(os.path.join(model_artifacts_location, filename), "rb") as f:
        mapping = pickle.load(f)[mapping_key]
        if vertex_label is not None:
            if vertex_label in mapping:
                mapping = mapping[vertex_label]
            else:
                print("Mapping for vertex label: {} not found.".format(vertex_label))
                print("valid vertex labels which have vertices mapped to embeddings: {} ".format(list(mapping.keys())))
                print("Returning mapping for all valid vertex labels")

    return mapping


def get_embeddings(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "embeddings/"),
                          os.path.join(download_location, "embeddings/"))

    entity_emb = np.load(os.path.join(download_location, "embeddings", "entity.npy"))

    return entity_emb


def get_predictions(training_job_name: str, download_location: str = './model-artifacts', class_preds: bool = False):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "predictions/"),
                          os.path.join(download_location, "predictions/"))

    preds = np.load(os.path.join(download_location, "predictions", "result.npz"))['infer_scores']

    if class_preds:
        return preds.argmax(axis=1)

    return preds


def get_performance_metrics(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "eval_metrics_info.json"),
                          download_location)

    with open(os.path.join(download_location, "eval_metrics_info.json")) as f:
        metrics = json.load(f)

    return metrics


class MovieLensProcessor:
    raw_directory = fr'{HOME_DIRECTORY}/data/raw'
    formatted_directory = fr'{HOME_DIRECTORY}/data/formatted'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/graph_notebook/notebooks/03-Neptune-ML/03-Sample-Applications/04-Telco-Networks/neptune_ml_utils.py [195:314]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    except Exception as e:
        logging.error(e)

def get_neptune_ml_job_output_location(job_name: str, job_type: str):
    assert job_type in ["dataprocessing", "modeltraining", "modeltransform"], "Invalid neptune ml job type"

    host, port, use_iam = load_configuration()

    response = signed_request("GET", service='neptune-db',
                              url=f'https://{host}:{port}/ml/{job_type}/{job_name}',
                              headers={'content-type': 'application/json'})
    result = json.loads(response.content.decode('utf-8'))
    if result["status"] != "Completed":
        logging.error("Neptune ML {} job: {} is not completed".format(job_type, job_name))
        return
    return result["processingJob"]["outputLocation"]


def get_dataprocessing_job_output_location(dataprocessing_job_name: str):
    assert dataprocessing_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(dataprocessing_job_name, "dataprocessing")


def get_modeltraining_job_output_location(training_job_name: str):
    assert training_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(training_job_name, "modeltraining")


def get_node_to_idx_mapping(training_job_name: str = None, dataprocessing_job_name: str = None,
                            model_artifacts_location: str = './model-artifacts', vertex_label: str = None):
    assert training_job_name is not None or dataprocessing_job_name is not None, \
        "You must provide either a modeltraining job id or a dataprocessing job id to obtain node to index mappings"

    job_name = training_job_name if training_job_name is not None else dataprocessing_job_name
    job_type = "modeltraining" if training_job_name == job_name else "dataprocessing"
    filename = "mapping.info" if training_job_name == job_name else "info.pkl"
    mapping_key = "node2id" if training_job_name == job_name else "node_id_map"

    # get mappings
    model_artifacts_location = os.path.join(model_artifacts_location, job_name)
    if not os.path.exists(os.path.join(model_artifacts_location, filename)):
        job_s3_output = get_neptune_ml_job_output_location(job_name, job_type)
        print(job_s3_output)
        if not job_s3_output:
            return
        S3Downloader.download(os.path.join(job_s3_output, filename), model_artifacts_location)

    with open(os.path.join(model_artifacts_location, filename), "rb") as f:
        mapping = pickle.load(f)[mapping_key]
        if vertex_label is not None:
            if vertex_label in mapping:
                mapping = mapping[vertex_label]
            else:
                print("Mapping for vertex label: {} not found.".format(vertex_label))
                print("valid vertex labels which have vertices mapped to embeddings: {} ".format(list(mapping.keys())))
                print("Returning mapping for all valid vertex labels")

    return mapping


def get_embeddings(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "embeddings/"),
                          os.path.join(download_location, "embeddings/"))

    entity_emb = np.load(os.path.join(download_location, "embeddings", "entity.npy"))

    return entity_emb


def get_predictions(training_job_name: str, download_location: str = './model-artifacts', class_preds: bool = False):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "predictions/"),
                          os.path.join(download_location, "predictions/"))

    preds = np.load(os.path.join(download_location, "predictions", "result.npz"))['infer_scores']

    if class_preds:
        return preds.argmax(axis=1)

    return preds


def get_performance_metrics(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "eval_metrics_info.json"),
                          download_location)

    with open(os.path.join(download_location, "eval_metrics_info.json")) as f:
        metrics = json.load(f)

    return metrics


class MovieLensProcessor:
    raw_directory = fr'{HOME_DIRECTORY}/data/raw'
    formatted_directory = fr'{HOME_DIRECTORY}/data/formatted'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



