src/graph_notebook/notebooks/03-Neptune-ML/03-Sample-Applications/04-Telco-Networks/neptune_ml_utils.py [19:362]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
UPDATE_DELAY_SECONDS = 15
HOME_DIRECTORY = os.path.expanduser("~")


def signed_request(method, url, data=None, params=None, headers=None, service=None):
    request = AWSRequest(method=method, url=url, data=data,
                         params=params, headers=headers)
    session = boto3.Session()
    credentials = session.get_credentials()
    try:
        frozen_creds = credentials.get_frozen_credentials()
    except AttributeError:
        print("Could not find valid IAM credentials in any the following locations:\n")
        print("env, assume-role, assume-role-with-web-identity, sso, shared-credential-file, custom-process, "
              "config-file, ec2-credentials-file, boto-config, container-role, iam-role\n")
        print("Go to https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more "
              "details on configuring your IAM credentials.")
        return request
    SigV4Auth(frozen_creds, service, boto3.Session().region_name).add_auth(request)
    return requests.request(method=method, url=url, headers=dict(request.headers), data=data)


def load_configuration():
    with open(f'{HOME_DIRECTORY}/graph_notebook_config.json') as f:
        data = json.load(f)
        host = data['host']
        port = data['port']
        if data.get('auth_mode') == 'IAM':
            iam = True
        else:
            iam = False
    return host, port, iam


def get_host():
    host, port, iam = load_configuration()
    return host


def get_iam():
    host, port, iam = load_configuration()
    return iam


def get_training_job_name(prefix: str):
    return f'{prefix}-{int(time.time())}'


def check_ml_enabled():
    host, port, use_iam = load_configuration()
    response = signed_request(
        "GET", url=f'https://{host}:{port}/ml/modeltraining', service='neptune-db')
    if response.status_code != 200:
        print('''This Neptune cluster \033[1mis not\033[0m configured to use Neptune ML.
Please configure the cluster according to the Amazon Neptune ML documentation before proceeding.''')
    else:
        print("This Neptune cluster is configured to use Neptune ML")


def get_export_service_host():
    with open(f'{HOME_DIRECTORY}/.bashrc') as f:
        data = f.readlines()
    for d in data:
        if str.startswith(d, 'export NEPTUNE_EXPORT_API_URI'):
            parts = d.split('=')
            if len(parts) == 2:
                path = urlparse(parts[1].rstrip())
                return path.hostname + "/v1"
    logging.error(
        "Unable to determine the Neptune Export Service Endpoint. You will need to enter this or assign it manually.")
    return None


def delete_pretrained_data(setup_node_classification: bool,
                           setup_node_regression: bool, setup_link_prediction: bool,
                           setup_edge_regression: bool, setup_edge_classification: bool):
    host, port, use_iam = load_configuration()
    if setup_node_classification:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {
                                          'gremlin': "g.V('movie_28', 'movie_69', 'movie_88').properties('genre').drop()"}))

        if response.status_code != 200:
            print(response.content.decode('utf-8'))
    if setup_node_regression:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps({'gremlin': "g.V('user_1').out('wrote').properties('score').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))
    if setup_link_prediction:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps({'gremlin': "g.V('user_1').outE('rated').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))

    if setup_edge_regression:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {'gremlin': "g.V('user_1').outE('rated').properties('score').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))

    if setup_edge_classification:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {'gremlin': "g.V('user_1').outE('rated').properties('scale').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))


def delete_pretrained_endpoints(endpoints: dict):
    sm = boto3.client("sagemaker")
    try:
        if 'node_classification_endpoint_name' in endpoints and endpoints['node_classification_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['node_classification_endpoint_name']["EndpointName"])
        if 'node_regression_endpoint_name' in endpoints and endpoints['node_regression_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['node_regression_endpoint_name']["EndpointName"])
        if 'prediction_endpoint_name' in endpoints and endpoints['prediction_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['prediction_endpoint_name']["EndpointName"])
        if 'edge_classification_endpoint_name' in endpoints and endpoints['edge_classification_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['edge_classification_endpoint_name']["EndpointName"])
        if 'edge_regression_endpoint_name' in endpoints and endpoints['edge_regression_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['edge_regression_endpoint_name']["EndpointName"])
        print(f'Endpoint(s) have been deleted')
    except Exception as e:
        logging.error(e)


def delete_endpoint(training_job_name: str, neptune_iam_role_arn=None):
    query_string = ""
    if neptune_iam_role_arn:
        query_string = f'?neptuneIamRoleArn={neptune_iam_role_arn}'
    host, port, use_iam = load_configuration()
    response = signed_request("DELETE", service='neptune-db',
                              url=f'https://{host}:{port}/ml/endpoints/{training_job_name}{query_string}',
                              headers={'content-type': 'application/json'})
    if response.status_code != 200:
        print(response.content.decode('utf-8'))
    else:
        print(response.content.decode('utf-8'))
        print(f'Endpoint {training_job_name} has been deleted')


def prepare_movielens_data(s3_bucket_uri: str):
    try:
        return MovieLensProcessor().prepare_movielens_data(s3_bucket_uri)
    except Exception as e:
        logging.error(e)


def setup_pretrained_endpoints(s3_bucket_uri: str, setup_node_classification: bool,
                               setup_node_regression: bool, setup_link_prediction: bool,
                               setup_edge_classification: bool, setup_edge_regression: bool):
    delete_pretrained_data(setup_node_classification,
                           setup_node_regression, setup_link_prediction,
                           setup_edge_classification, setup_edge_regression)
    try:
        return PretrainedModels().setup_pretrained_endpoints(s3_bucket_uri, setup_node_classification,
                                                             setup_node_regression, setup_link_prediction,
                                                             setup_edge_classification, setup_edge_regression)
    except Exception as e:
        logging.error(e)

def get_neptune_ml_job_output_location(job_name: str, job_type: str):
    assert job_type in ["dataprocessing", "modeltraining", "modeltransform"], "Invalid neptune ml job type"

    host, port, use_iam = load_configuration()

    response = signed_request("GET", service='neptune-db',
                              url=f'https://{host}:{port}/ml/{job_type}/{job_name}',
                              headers={'content-type': 'application/json'})
    result = json.loads(response.content.decode('utf-8'))
    if result["status"] != "Completed":
        logging.error("Neptune ML {} job: {} is not completed".format(job_type, job_name))
        return
    return result["processingJob"]["outputLocation"]


def get_dataprocessing_job_output_location(dataprocessing_job_name: str):
    assert dataprocessing_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(dataprocessing_job_name, "dataprocessing")


def get_modeltraining_job_output_location(training_job_name: str):
    assert training_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(training_job_name, "modeltraining")


def get_node_to_idx_mapping(training_job_name: str = None, dataprocessing_job_name: str = None,
                            model_artifacts_location: str = './model-artifacts', vertex_label: str = None):
    assert training_job_name is not None or dataprocessing_job_name is not None, \
        "You must provide either a modeltraining job id or a dataprocessing job id to obtain node to index mappings"

    job_name = training_job_name if training_job_name is not None else dataprocessing_job_name
    job_type = "modeltraining" if training_job_name == job_name else "dataprocessing"
    filename = "mapping.info" if training_job_name == job_name else "info.pkl"
    mapping_key = "node2id" if training_job_name == job_name else "node_id_map"

    # get mappings
    model_artifacts_location = os.path.join(model_artifacts_location, job_name)
    if not os.path.exists(os.path.join(model_artifacts_location, filename)):
        job_s3_output = get_neptune_ml_job_output_location(job_name, job_type)
        print(job_s3_output)
        if not job_s3_output:
            return
        S3Downloader.download(os.path.join(job_s3_output, filename), model_artifacts_location)

    with open(os.path.join(model_artifacts_location, filename), "rb") as f:
        mapping = pickle.load(f)[mapping_key]
        if vertex_label is not None:
            if vertex_label in mapping:
                mapping = mapping[vertex_label]
            else:
                print("Mapping for vertex label: {} not found.".format(vertex_label))
                print("valid vertex labels which have vertices mapped to embeddings: {} ".format(list(mapping.keys())))
                print("Returning mapping for all valid vertex labels")

    return mapping


def get_embeddings(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "embeddings/"),
                          os.path.join(download_location, "embeddings/"))

    entity_emb = np.load(os.path.join(download_location, "embeddings", "entity.npy"))

    return entity_emb


def get_predictions(training_job_name: str, download_location: str = './model-artifacts', class_preds: bool = False):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "predictions/"),
                          os.path.join(download_location, "predictions/"))

    preds = np.load(os.path.join(download_location, "predictions", "result.npz"))['infer_scores']

    if class_preds:
        return preds.argmax(axis=1)

    return preds


def get_performance_metrics(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "eval_metrics_info.json"),
                          download_location)

    with open(os.path.join(download_location, "eval_metrics_info.json")) as f:
        metrics = json.load(f)

    return metrics


class MovieLensProcessor:
    raw_directory = fr'{HOME_DIRECTORY}/data/raw'
    formatted_directory = fr'{HOME_DIRECTORY}/data/formatted'

    def __download_and_unzip(self):
        if not os.path.exists(f'{HOME_DIRECTORY}/data'):
            os.makedirs(f'{HOME_DIRECTORY}/data')
        if not os.path.exists(f'{HOME_DIRECTORY}/data/raw'):
            os.makedirs(f'{HOME_DIRECTORY}/data/raw')
        if not os.path.exists(f'{HOME_DIRECTORY}/data/formatted'):
            os.makedirs(f'{HOME_DIRECTORY}/data/formatted')
        # Download the MovieLens dataset
        url = 'https://files.grouplens.org/datasets/movielens/ml-100k.zip'
        r = requests.get(url, allow_redirects=True)
        open(os.path.join(self.raw_directory, 'ml-100k.zip'), 'wb').write(r.content)

        with zipfile.ZipFile(os.path.join(self.raw_directory, 'ml-100k.zip'), 'r') as zip_ref:
            zip_ref.extractall(self.raw_directory)

    def __process_movies_genres(self):
        # process the movies_vertex.csv
        print('Processing Movies', end='\r')
        movies_df = pd.read_csv(os.path.join(
            self.raw_directory, 'ml-100k/u.item'), sep='|', encoding='ISO-8859-1',
            names=['~id', 'title', 'release_date', 'video_release_date', 'imdb_url',
                   'unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
                   'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
                   'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western'])
        # Parse date and convert to ISO format
        movies_df['release_date'] = movies_df['release_date'].apply(
            lambda x: str(
                datetime.strptime(x, '%d-%b-%Y').isoformat()) if not pd.isna(x) else x)
        movies_df['~label'] = 'movie'
        movies_df['~id'] = movies_df['~id'].apply(
            lambda x: f'movie_{x}')
        movie_genre_df = movies_df[[
            '~id', 'unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
            'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
            'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']]
        genres_edges_df = pd.DataFrame(
            columns=['~id', '~from', '~to', '~label'])

        genres = ['unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
                  'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
                  'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']

        genre_df = pd.DataFrame(genres, columns=['~id'])
        genre_df['~label'] = 'genre'
        genre_df['name'] = genre_df['~id']
        genre_df.to_csv(os.path.join(self.formatted_directory,
                                     'genre_vertex.csv'), index=False)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/graph_notebook/notebooks/03-Neptune-ML/neptune_ml_utils.py [19:362]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
UPDATE_DELAY_SECONDS = 15
HOME_DIRECTORY = os.path.expanduser("~")


def signed_request(method, url, data=None, params=None, headers=None, service=None):
    request = AWSRequest(method=method, url=url, data=data,
                         params=params, headers=headers)
    session = boto3.Session()
    credentials = session.get_credentials()
    try:
        frozen_creds = credentials.get_frozen_credentials()
    except AttributeError:
        print("Could not find valid IAM credentials in any the following locations:\n")
        print("env, assume-role, assume-role-with-web-identity, sso, shared-credential-file, custom-process, "
              "config-file, ec2-credentials-file, boto-config, container-role, iam-role\n")
        print("Go to https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more "
              "details on configuring your IAM credentials.")
        return request
    SigV4Auth(frozen_creds, service, boto3.Session().region_name).add_auth(request)
    return requests.request(method=method, url=url, headers=dict(request.headers), data=data)


def load_configuration():
    with open(f'{HOME_DIRECTORY}/graph_notebook_config.json') as f:
        data = json.load(f)
        host = data['host']
        port = data['port']
        if data.get('auth_mode') == 'IAM':
            iam = True
        else:
            iam = False
    return host, port, iam


def get_host():
    host, port, iam = load_configuration()
    return host


def get_iam():
    host, port, iam = load_configuration()
    return iam


def get_training_job_name(prefix: str):
    return f'{prefix}-{int(time.time())}'


def check_ml_enabled():
    host, port, use_iam = load_configuration()
    response = signed_request(
        "GET", url=f'https://{host}:{port}/ml/modeltraining', service='neptune-db')
    if response.status_code != 200:
        print('''This Neptune cluster \033[1mis not\033[0m configured to use Neptune ML.
Please configure the cluster according to the Amazon Neptune ML documentation before proceeding.''')
    else:
        print("This Neptune cluster is configured to use Neptune ML")


def get_export_service_host():
    with open(f'{HOME_DIRECTORY}/.bashrc') as f:
        data = f.readlines()
    for d in data:
        if str.startswith(d, 'export NEPTUNE_EXPORT_API_URI'):
            parts = d.split('=')
            if len(parts) == 2:
                path = urlparse(parts[1].rstrip())
                return path.hostname + "/v1"
    logging.error(
        "Unable to determine the Neptune Export Service Endpoint. You will need to enter this or assign it manually.")
    return None


def delete_pretrained_data(setup_node_classification: bool,
                           setup_node_regression: bool, setup_link_prediction: bool,
                           setup_edge_regression: bool, setup_edge_classification: bool):
    host, port, use_iam = load_configuration()
    if setup_node_classification:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {
                                          'gremlin': "g.V('movie_28', 'movie_69', 'movie_88').properties('genre').drop()"}))

        if response.status_code != 200:
            print(response.content.decode('utf-8'))
    if setup_node_regression:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps({'gremlin': "g.V('user_1').out('wrote').properties('score').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))
    if setup_link_prediction:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps({'gremlin': "g.V('user_1').outE('rated').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))

    if setup_edge_regression:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {'gremlin': "g.V('user_1').outE('rated').properties('score').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))

    if setup_edge_classification:
        response = signed_request("POST", service='neptune-db',
                                  url=f'https://{host}:{port}/gremlin',
                                  headers={'content-type': 'application/json'},
                                  data=json.dumps(
                                      {'gremlin': "g.V('user_1').outE('rated').properties('scale').drop()"}))
        if response.status_code != 200:
            print(response.content.decode('utf-8'))


def delete_pretrained_endpoints(endpoints: dict):
    sm = boto3.client("sagemaker")
    try:
        if 'node_classification_endpoint_name' in endpoints and endpoints['node_classification_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['node_classification_endpoint_name']["EndpointName"])
        if 'node_regression_endpoint_name' in endpoints and endpoints['node_regression_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['node_regression_endpoint_name']["EndpointName"])
        if 'prediction_endpoint_name' in endpoints and endpoints['prediction_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['prediction_endpoint_name']["EndpointName"])
        if 'edge_classification_endpoint_name' in endpoints and endpoints['edge_classification_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['edge_classification_endpoint_name']["EndpointName"])
        if 'edge_regression_endpoint_name' in endpoints and endpoints['edge_regression_endpoint_name']:
            sm.delete_endpoint(
                EndpointName=endpoints['edge_regression_endpoint_name']["EndpointName"])
        print(f'Endpoint(s) have been deleted')
    except Exception as e:
        logging.error(e)


def delete_endpoint(training_job_name: str, neptune_iam_role_arn=None):
    query_string = ""
    if neptune_iam_role_arn:
        query_string = f'?neptuneIamRoleArn={neptune_iam_role_arn}'
    host, port, use_iam = load_configuration()
    response = signed_request("DELETE", service='neptune-db',
                              url=f'https://{host}:{port}/ml/endpoints/{training_job_name}{query_string}',
                              headers={'content-type': 'application/json'})
    if response.status_code != 200:
        print(response.content.decode('utf-8'))
    else:
        print(response.content.decode('utf-8'))
        print(f'Endpoint {training_job_name} has been deleted')


def prepare_movielens_data(s3_bucket_uri: str):
    try:
        return MovieLensProcessor().prepare_movielens_data(s3_bucket_uri)
    except Exception as e:
        logging.error(e)


def setup_pretrained_endpoints(s3_bucket_uri: str, setup_node_classification: bool,
                               setup_node_regression: bool, setup_link_prediction: bool,
                               setup_edge_classification: bool, setup_edge_regression: bool):
    delete_pretrained_data(setup_node_classification,
                           setup_node_regression, setup_link_prediction,
                           setup_edge_classification, setup_edge_regression)
    try:
        return PretrainedModels().setup_pretrained_endpoints(s3_bucket_uri, setup_node_classification,
                                                             setup_node_regression, setup_link_prediction,
                                                             setup_edge_classification, setup_edge_regression)
    except Exception as e:
        logging.error(e)

def get_neptune_ml_job_output_location(job_name: str, job_type: str):
    assert job_type in ["dataprocessing", "modeltraining", "modeltransform"], "Invalid neptune ml job type"

    host, port, use_iam = load_configuration()

    response = signed_request("GET", service='neptune-db',
                              url=f'https://{host}:{port}/ml/{job_type}/{job_name}',
                              headers={'content-type': 'application/json'})
    result = json.loads(response.content.decode('utf-8'))
    if result["status"] != "Completed":
        logging.error("Neptune ML {} job: {} is not completed".format(job_type, job_name))
        return
    return result["processingJob"]["outputLocation"]


def get_dataprocessing_job_output_location(dataprocessing_job_name: str):
    assert dataprocessing_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(dataprocessing_job_name, "dataprocessing")


def get_modeltraining_job_output_location(training_job_name: str):
    assert training_job_name is not None, \
        "Neptune ML training job name id should be passed, if training job s3 output is missing"
    return get_neptune_ml_job_output_location(training_job_name, "modeltraining")


def get_node_to_idx_mapping(training_job_name: str = None, dataprocessing_job_name: str = None,
                            model_artifacts_location: str = './model-artifacts', vertex_label: str = None):
    assert training_job_name is not None or dataprocessing_job_name is not None, \
        "You must provide either a modeltraining job id or a dataprocessing job id to obtain node to index mappings"

    job_name = training_job_name if training_job_name is not None else dataprocessing_job_name
    job_type = "modeltraining" if training_job_name == job_name else "dataprocessing"
    filename = "mapping.info" if training_job_name == job_name else "info.pkl"
    mapping_key = "node2id" if training_job_name == job_name else "node_id_map"

    # get mappings
    model_artifacts_location = os.path.join(model_artifacts_location, job_name)
    if not os.path.exists(os.path.join(model_artifacts_location, filename)):
        job_s3_output = get_neptune_ml_job_output_location(job_name, job_type)
        print(job_s3_output)
        if not job_s3_output:
            return
        S3Downloader.download(os.path.join(job_s3_output, filename), model_artifacts_location)

    with open(os.path.join(model_artifacts_location, filename), "rb") as f:
        mapping = pickle.load(f)[mapping_key]
        if vertex_label is not None:
            if vertex_label in mapping:
                mapping = mapping[vertex_label]
            else:
                print("Mapping for vertex label: {} not found.".format(vertex_label))
                print("valid vertex labels which have vertices mapped to embeddings: {} ".format(list(mapping.keys())))
                print("Returning mapping for all valid vertex labels")

    return mapping


def get_embeddings(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "embeddings/"),
                          os.path.join(download_location, "embeddings/"))

    entity_emb = np.load(os.path.join(download_location, "embeddings", "entity.npy"))

    return entity_emb


def get_predictions(training_job_name: str, download_location: str = './model-artifacts', class_preds: bool = False):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "predictions/"),
                          os.path.join(download_location, "predictions/"))

    preds = np.load(os.path.join(download_location, "predictions", "result.npz"))['infer_scores']

    if class_preds:
        return preds.argmax(axis=1)

    return preds


def get_performance_metrics(training_job_name: str, download_location: str = './model-artifacts'):
    training_job_s3_output = get_modeltraining_job_output_location(training_job_name)
    if not training_job_s3_output:
        return

    download_location = os.path.join(download_location, training_job_name)
    os.makedirs(download_location, exist_ok=True)
    # download embeddings and mapping info

    S3Downloader.download(os.path.join(training_job_s3_output, "eval_metrics_info.json"),
                          download_location)

    with open(os.path.join(download_location, "eval_metrics_info.json")) as f:
        metrics = json.load(f)

    return metrics


class MovieLensProcessor:
    raw_directory = fr'{HOME_DIRECTORY}/data/raw'
    formatted_directory = fr'{HOME_DIRECTORY}/data/formatted'

    def __download_and_unzip(self):
        if not os.path.exists(f'{HOME_DIRECTORY}/data'):
            os.makedirs(f'{HOME_DIRECTORY}/data')
        if not os.path.exists(f'{HOME_DIRECTORY}/data/raw'):
            os.makedirs(f'{HOME_DIRECTORY}/data/raw')
        if not os.path.exists(f'{HOME_DIRECTORY}/data/formatted'):
            os.makedirs(f'{HOME_DIRECTORY}/data/formatted')
        # Download the MovieLens dataset
        url = 'https://files.grouplens.org/datasets/movielens/ml-100k.zip'
        r = requests.get(url, allow_redirects=True)
        open(os.path.join(self.raw_directory, 'ml-100k.zip'), 'wb').write(r.content)

        with zipfile.ZipFile(os.path.join(self.raw_directory, 'ml-100k.zip'), 'r') as zip_ref:
            zip_ref.extractall(self.raw_directory)

    def __process_movies_genres(self):
        # process the movies_vertex.csv
        print('Processing Movies', end='\r')
        movies_df = pd.read_csv(os.path.join(
            self.raw_directory, 'ml-100k/u.item'), sep='|', encoding='ISO-8859-1',
            names=['~id', 'title', 'release_date', 'video_release_date', 'imdb_url',
                   'unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
                   'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
                   'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western'])
        # Parse date and convert to ISO format
        movies_df['release_date'] = movies_df['release_date'].apply(
            lambda x: str(
                datetime.strptime(x, '%d-%b-%Y').isoformat()) if not pd.isna(x) else x)
        movies_df['~label'] = 'movie'
        movies_df['~id'] = movies_df['~id'].apply(
            lambda x: f'movie_{x}')
        movie_genre_df = movies_df[[
            '~id', 'unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
            'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
            'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']]
        genres_edges_df = pd.DataFrame(
            columns=['~id', '~from', '~to', '~label'])

        genres = ['unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy',
                  'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical',
                  'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']

        genre_df = pd.DataFrame(genres, columns=['~id'])
        genre_df['~label'] = 'genre'
        genre_df['name'] = genre_df['~id']
        genre_df.to_csv(os.path.join(self.formatted_directory,
                                     'genre_vertex.csv'), index=False)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



