sagemaker-pyspark-sdk/src/sagemaker_pyspark/algorithms/PCASageMakerEstimator.py (152 lines of code) (raw):
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.util import Identifiable
from sagemaker_pyspark import (SageMakerEstimatorBase, S3AutoCreatePath, Option, IAMRoleFromConfig,
EndpointCreationPolicy, SageMakerClients, RandomNamePolicyFactory)
from sagemaker_pyspark.transformation.serializers import ProtobufRequestRowSerializer
from sagemaker_pyspark.transformation.deserializers import PCAProtobufResponseRowDeserializer
class PCASageMakerEstimator(SageMakerEstimatorBase):
"""
A :class:`~sagemaker_pyspark.SageMakerEstimator` that runs a PCA training job in SageMaker and
returns a :class:`~sagemaker_pyspark.SageMakerModel` that can be used to transform a DataFrame
using the hosted PCA model. PCA, or Principal Component Analysis, is useful for reducing the
dimensionality of data before training with another algorithm.
Amazon SageMaker PCA trains on RecordIO-encoded Amazon Record protobuf data.
SageMaker pyspark writes a DataFrame to S3 by selecting a column of Vectors named "features"
and, if present, a column of Doubles named "label". These names are configurable by passing a
dictionary with entries in trainingSparkDataFormatOptions with key "labelColumnName" or
"featuresColumnName", with values corresponding to the desired label and features columns.
:class:`~sagemaker_pyspark.transformation.serializers.ProtobufRequestRowSerializer` is used
by PCASageMakerEstimator to serialize Rows into RecordIO-encoded Amazon Record protobuf
messages for inference, by default selecting the column named "features" expected to contain
a Vector of Doubles.
Inferences made against an Endpoint hosting a PCA model contain a "projection" field appended
to the input DataFrame as a Dense Vector of Doubles.
Args:
sageMakerRole (IAMRole): The SageMaker TrainingJob and Hosting IAM Role. Used by
SageMaker to access S3 and ECR Resources. SageMaker hosted Endpoint instances
launched by this Estimator run with this role.
trainingInstanceType (str): The SageMaker TrainingJob Instance Type to use.
trainingInstanceCount (int): The number of instances of instanceType to run an
SageMaker Training Job with.
endpointInstanceType (str): The SageMaker Endpoint Config instance type.
endpointInitialInstanceCount (int): The SageMaker Endpoint Config minimum number of
instances that can be used to host modelImage.
requestRowSerializer (RequestRowSerializer): Serializes Spark DataFrame Rows for
transformation by Models built from this Estimator.
responseRowDeserializer (ResponseRowDeserializer): Deserializes an Endpoint response into a
series of Rows.
trainingInputS3DataPath (S3Resource): An S3 location to upload SageMaker Training Job input
data to.
trainingOutputS3DataPath (S3Resource): An S3 location for SageMaker to store Training Job
output data to.
trainingInstanceVolumeSizeInGB (int): The EBS volume size in gigabytes of each instance.
trainingProjectedColumns (List): The columns to project from the Dataset being fit before
training. If an Optional.empty is passed then no specific projection will occur and
all columns will be serialized.
trainingChannelName (str): The SageMaker Channel name to input serialized Dataset fit
input to.
trainingContentType (str): The MIME type of the training data.
trainingS3DataDistribution (str): The SageMaker Training Job S3 data distribution scheme.
trainingSparkDataFormat (str): The Spark Data Format name used to serialize the Dataset
being fit for input to SageMaker.
trainingSparkDataFormatOptions (dict): The Spark Data Format Options used during
serialization of the Dataset being fit.
trainingInputMode (str): The SageMaker Training Job Channel input mode.
trainingCompressionCodec (str): The type of compression to use when serializing the
Dataset being fit for input to SageMaker.
trainingMaxRuntimeInSeconds (int): A SageMaker Training Job Termination Condition
MaxRuntimeInHours.
trainingKmsKeyId (str): A KMS key ID for the Output Data Source.
modelEnvironmentVariables (dict): The environment variables that SageMaker will set on the
model container during execution.
endpointCreationPolicy (EndpointCreationPolicy): Defines how a SageMaker Endpoint
referenced by a SageMakerModel is created.
sagemakerClient (AmazonSageMaker) Amazon SageMaker client. Used to send CreateTrainingJob,
CreateModel, and CreateEndpoint requests.
region (str): The region in which to run the algorithm. If not specified, gets the region
from the DefaultAwsRegionProviderChain.
s3Client (AmazonS3): Used to create a bucket for staging SageMaker Training Job
input and/or output if either are set to S3AutoCreatePath.
stsClient (AmazonSTS): Used to resolve the account number when creating staging
input / output buckets.
modelPrependInputRowsToTransformationRows (bool): Whether the transformation result on
Models built by this Estimator should also include the input Rows. If true,
each output Row is formed by a concatenation of the input Row with the corresponding
Row produced by SageMaker Endpoint invocation, produced by responseRowDeserializer.
If false, each output Row is just taken from responseRowDeserializer.
deleteStagingDataAfterTraining (bool): Whether to remove the training data on s3 after
training is complete or failed.
namePolicyFactory (NamePolicyFactory): The NamePolicyFactory to use when naming SageMaker
entities created during fit.
uid (str): The unique identifier of this Estimator. Used to represent this stage in Spark
ML pipelines.
"""
_wrapped_class = "com.amazonaws.services.sagemaker.sparksdk.algorithms.PCASageMakerEstimator"
num_components = Param(Params._dummy(), "num_components",
"Number of principal components we wish to compute. Must be > 0",
typeConverter=TypeConverters.toInt)
algorithm_mode = Param(Params._dummy(), "algorithm_mode",
"Determines the algorithm computing the principal components" +
"Supported options: 'regular', 'stable' and 'randomized'.",
typeConverter=TypeConverters.toString)
subtract_mean = Param(Params._dummy(), "subtract_mean",
"If true, the data will be unbiased both during training and " +
"inference",
typeConverter=TypeConverters.toString)
extra_components = Param(Params._dummy(), "extra_components",
"Number of extra components to compute" +
"Valid for 'randomized' mode. Ignored by other modes."
" Must be -1 or > 0",
typeConverter=TypeConverters.toInt)
mini_batch_size = Param(Params._dummy(), "mini_batch_size",
"The number of examples in a mini-batch. Must be > 0",
typeConverter=TypeConverters.toInt)
feature_dim = Param(Params._dummy(), "feature_dim",
"The dimension of the input vectors. Must be > 0",
typeConverter=TypeConverters.toInt)
def __init__(self,
trainingInstanceType,
trainingInstanceCount,
endpointInstanceType,
endpointInitialInstanceCount,
sagemakerRole=IAMRoleFromConfig(),
requestRowSerializer=ProtobufRequestRowSerializer(),
responseRowDeserializer=PCAProtobufResponseRowDeserializer(),
trainingInputS3DataPath=S3AutoCreatePath(),
trainingOutputS3DataPath=S3AutoCreatePath(),
trainingInstanceVolumeSizeInGB=1024,
trainingProjectedColumns=None,
trainingChannelName="train",
trainingContentType=None,
trainingS3DataDistribution="ShardedByS3Key",
trainingSparkDataFormat="sagemaker",
trainingSparkDataFormatOptions=None,
trainingInputMode="File",
trainingCompressionCodec=None,
trainingMaxRuntimeInSeconds=24*60*60,
trainingKmsKeyId=None,
modelEnvironmentVariables=None,
endpointCreationPolicy=EndpointCreationPolicy.CREATE_ON_CONSTRUCT,
sagemakerClient=SageMakerClients.create_sagemaker_client(),
region=None,
s3Client=SageMakerClients.create_s3_default_client(),
stsClient=SageMakerClients.create_sts_default_client(),
modelPrependInputRowsToTransformationRows=True,
deleteStagingDataAfterTraining=True,
namePolicyFactory=RandomNamePolicyFactory(),
uid=None):
if trainingSparkDataFormatOptions is None:
trainingSparkDataFormatOptions = {}
if modelEnvironmentVariables is None:
modelEnvironmentVariables = {}
if uid is None:
uid = Identifiable._randomUID()
kwargs = locals().copy()
del kwargs['self']
super(PCASageMakerEstimator, self).__init__(**kwargs)
default_params = {
'subtract_mean': 'True'
}
self._setDefault(**default_params)
def _get_java_obj(self, **kwargs):
return self._new_java_obj(
PCASageMakerEstimator._wrapped_class,
kwargs['sagemakerRole'],
kwargs['trainingInstanceType'],
kwargs['trainingInstanceCount'],
kwargs['endpointInstanceType'],
kwargs['endpointInitialInstanceCount'],
kwargs['requestRowSerializer'],
kwargs['responseRowDeserializer'],
kwargs['trainingInputS3DataPath'],
kwargs['trainingOutputS3DataPath'],
kwargs['trainingInstanceVolumeSizeInGB'],
Option(kwargs['trainingProjectedColumns']),
kwargs['trainingChannelName'],
Option(kwargs['trainingContentType']),
kwargs['trainingS3DataDistribution'],
kwargs['trainingSparkDataFormat'],
kwargs['trainingSparkDataFormatOptions'],
kwargs['trainingInputMode'],
Option(kwargs['trainingCompressionCodec']),
kwargs['trainingMaxRuntimeInSeconds'],
Option(kwargs['trainingKmsKeyId']),
kwargs['modelEnvironmentVariables'],
kwargs['endpointCreationPolicy'],
kwargs['sagemakerClient'],
Option(kwargs['region']),
kwargs['s3Client'],
kwargs['stsClient'],
kwargs['modelPrependInputRowsToTransformationRows'],
kwargs['deleteStagingDataAfterTraining'],
kwargs['namePolicyFactory'],
kwargs['uid']
)
def getNumComponents(self):
return self.getOrDefault(self.num_components)
def setNumComponents(self, value):
if value < 1:
raise ValueError("num_components must be > 0, got: %s" % value)
self._set(num_components=value)
def getAlgorithmMode(self):
return self.getOrDefault(self.algorithm_mode)
def setAlgorithmMode(self, value):
if value not in ('regular', 'stable', 'randomized'):
raise ValueError("AlgorithmMode must be 'random', 'stable' or 'randomized',"
" got %s" % value)
self._set(algorithm_mode=value)
def getSubtractMean(self):
value = self.getOrDefault(self.subtract_mean)
if value == 'True':
return True
else:
return False
def setSubtractMean(self, value):
if value not in ('True', 'False'):
raise ValueError("SubtractMean must be 'True' or 'False', got %s" % value)
self._set(subtract_mean=value)
def getExtraComponents(self):
return self.getOrDefault(self.extra_components)
def setExtraComponents(self, value):
if value != -1 and value < 1:
raise ValueError("ExtraComponents must be > 0 or -1, got : %s" % value)
self._set(extra_components=value)
def getMiniBatchSize(self):
return self.getOrDefault(self.mini_batch_size)
def setMiniBatchSize(self, size):
if size <= 0:
raise ValueError("mini_batch_size must be > 0. Got %s" % size)
self._set(mini_batch_size=size)
def getFeatureDim(self):
return self.getOrDefault(self.feature_dim)
def setFeatureDim(self, value):
if value <= 0:
raise ValueError("feature_dim must be > 0. Got %s" % value)
self._set(feature_dim=value)
@classmethod
def _from_java(cls, javaObject):
return PCASageMakerEstimator(sagemakerRole=None, javaObject=javaObject)