ludwig/features/vector_feature.py (254 lines of code) (raw):
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import logging
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.losses import MeanAbsoluteError
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import \
MeanAbsoluteError as MeanAbsoluteErrorMetric
from tensorflow.keras.metrics import MeanSquaredError as MeanSquaredErrorMetric
from ludwig.constants import *
from ludwig.decoders.generic_decoders import Projector
from ludwig.encoders.generic_encoders import PassthroughEncoder, \
DenseEncoder
from ludwig.features.base_feature import InputFeature
from ludwig.features.base_feature import OutputFeature
from ludwig.globals import is_on_master
from ludwig.modules.loss_modules import SoftmaxCrossEntropyLoss
from ludwig.modules.metric_modules import ErrorScore, \
SoftmaxCrossEntropyMetric
from ludwig.modules.metric_modules import R2Score
from ludwig.utils.misc_utils import set_default_value
logger = logging.getLogger(__name__)
# TODO TF2 can we eliminate use of these customer wrapper classes?
# These are copies of the classes in numerical_modules,
# depending on what we end up doing with those, these will follow
# custom class to handle how Ludwig stores predictions
class MSELoss(MeanSquaredError):
def __init__(self, **kwargs):
super(MSELoss, self).__init__(**kwargs)
def __call__(self, y_true, y_pred, sample_weight=None):
logits = y_pred[LOGITS]
loss = super().__call__(y_true, logits, sample_weight=sample_weight)
return loss
class MSEMetric(MeanSquaredErrorMetric):
def __init__(self, **kwargs):
super(MSEMetric, self).__init__(**kwargs)
def update_state(self, y_true, y_pred, sample_weight=None):
super().update_state(
y_true, y_pred[PREDICTIONS], sample_weight=sample_weight
)
class MAELoss(MeanAbsoluteError):
def __init__(self, **kwargs):
super(MAELoss, self).__init__(**kwargs)
def __call__(self, y_true, y_pred, sample_weight=None):
logits = y_pred[LOGITS]
loss = super().__call__(y_true, logits, sample_weight=sample_weight)
return loss
class MAEMetric(MeanAbsoluteErrorMetric):
def __init__(self, **kwargs):
super(MAEMetric, self).__init__(**kwargs)
def update_state(self, y_true, y_pred, sample_weight=None):
super().update_state(
y_true, y_pred[PREDICTIONS], sample_weight=sample_weight
)
class VectorFeatureMixin(object):
type = VECTOR
preprocessing_defaults = {
'missing_value_strategy': FILL_WITH_CONST,
'fill_value': ""
}
@staticmethod
def get_feature_meta(column, preprocessing_parameters):
return {
'preprocessing': preprocessing_parameters
}
@staticmethod
def add_feature_data(
feature,
dataset_df,
data,
metadata,
preprocessing_parameters,
):
"""
Expects all the vectors to be of the same size. The vectors need to be
whitespace delimited strings. Missing values are not handled.
"""
if len(dataset_df) == 0:
raise ValueError("There are no vectors in the dataset provided")
# Convert the string of features into a numpy array
try:
data[feature['name']] = np.array(
[x.split() for x in dataset_df[feature['name']]],
dtype=np.float32
)
except ValueError:
logger.error(
'Unable to read the vector data. Make sure that all the vectors'
' are of the same size and do not have missing/null values.'
)
raise
# Determine vector size
vector_size = len(data[feature['name']][0])
if 'vector_size' in preprocessing_parameters:
if vector_size != preprocessing_parameters['vector_size']:
raise ValueError(
'The user provided value for vector size ({}) does not '
'match the value observed in the data: {}'.format(
preprocessing_parameters, vector_size
)
)
else:
logger.debug('Observed vector size: {}'.format(vector_size))
metadata[feature['name']]['vector_size'] = vector_size
class VectorInputFeature(VectorFeatureMixin, InputFeature):
encoder = 'dense'
def __init__(self, feature, encoder_obj=None):
super().__init__(feature)
self.overwrite_defaults(feature)
if encoder_obj:
self.encoder_obj = encoder_obj
else:
self.encoder_obj = self.initialize_encoder(feature)
def call(self, inputs, training=None, mask=None):
assert isinstance(inputs, tf.Tensor)
assert inputs.dtype == tf.float32 or inputs.dtype == tf.float64
assert len(inputs.shape) == 2
inputs_encoded = self.encoder_obj(
inputs, training=training, mask=mask
)
return inputs_encoded
def get_input_dtype(self):
return tf.float32
def get_input_shape(self):
return self.vector_size,
@staticmethod
def update_model_definition_with_metadata(
input_feature,
feature_metadata,
*args,
**kwargs
):
for key in ['vector_size']:
input_feature[key] = feature_metadata[key]
@staticmethod
def populate_defaults(input_feature):
set_default_value(input_feature, TIED, None)
set_default_value(input_feature, 'preprocessing', {})
encoder_registry = {
'dense': DenseEncoder,
'passthrough': PassthroughEncoder,
'null': PassthroughEncoder,
'none': PassthroughEncoder,
'None': PassthroughEncoder,
None: PassthroughEncoder
}
class VectorOutputFeature(VectorFeatureMixin, OutputFeature):
decoder = 'projector'
loss = {TYPE: MEAN_SQUARED_ERROR}
metric_functions = {LOSS: None, ERROR: None, MEAN_SQUARED_ERROR: None,
MEAN_ABSOLUTE_ERROR: None, R2: None}
default_validation_metric = MEAN_SQUARED_ERROR
vector_size = 0
def __init__(self, feature):
super().__init__(feature)
self.overwrite_defaults(feature)
self.decoder_obj = self.initialize_decoder(feature)
self._setup_loss()
self._setup_metrics()
def logits(
self,
inputs, # hidden
**kwargs
):
hidden = inputs[HIDDEN]
return self.decoder_obj(hidden)
def predictions(
self,
inputs, # logits
**kwargs
):
return {PREDICTIONS: inputs[LOGITS], LOGITS: inputs[LOGITS]}
def _setup_loss(self):
if self.loss[TYPE] == 'mean_squared_error':
self.train_loss_function = MSELoss()
self.eval_loss_function = MSEMetric(name='eval_loss')
elif self.loss[TYPE] == 'mean_absolute_error':
self.train_loss_function = MAELoss()
self.eval_loss_function = MAEMetric(name='eval_loss')
elif self.loss[TYPE] == SOFTMAX_CROSS_ENTROPY:
self.train_loss_function = SoftmaxCrossEntropyLoss(
num_classes=self.vector_size,
feature_loss=self.loss,
name='train_loss'
)
self.eval_loss_function = SoftmaxCrossEntropyMetric(
num_classes=self.vector_size,
feature_loss=self.loss,
name='eval_loss'
)
else:
raise ValueError(
'Unsupported loss type {}'.format(self.loss[TYPE])
)
def _setup_metrics(self):
self.metric_functions[LOSS] = self.eval_loss_function
self.metric_functions[ERROR] = ErrorScore(name='metric_error')
self.metric_functions[MEAN_SQUARED_ERROR] = MeanSquaredErrorMetric(
name='metric_mse'
)
self.metric_functions[MEAN_ABSOLUTE_ERROR] = MeanAbsoluteErrorMetric(
name='metric_mae'
)
self.metric_functions[R2] = R2Score(name='metric_r2')
def get_output_dtype(self):
return tf.float32
def get_output_shape(self):
return self.vector_size,
@staticmethod
def update_model_definition_with_metadata(
output_feature,
feature_metadata,
*args,
**kwargs
):
output_feature['vector_size'] = feature_metadata['vector_size']
@staticmethod
def calculate_overall_stats(
test_stats,
output_feature,
dataset,
train_set_metadata
):
pass
@staticmethod
def postprocess_results(
output_feature,
result,
metadata,
experiment_dir_name,
skip_save_unprocessed_output=False,
):
postprocessed = {}
name = output_feature['name']
npy_filename = None
if is_on_master():
npy_filename = os.path.join(experiment_dir_name, '{}_{}.npy')
else:
skip_save_unprocessed_output = True
if PREDICTIONS in result and len(result[PREDICTIONS]) > 0:
postprocessed[PREDICTIONS] = result[PREDICTIONS].numpy()
if not skip_save_unprocessed_output:
np.save(
npy_filename.format(name, PREDICTIONS),
result[PREDICTIONS]
)
del result[PREDICTIONS]
return postprocessed
@staticmethod
def populate_defaults(output_feature):
set_default_value(output_feature, LOSS, {})
set_default_value(output_feature[LOSS], 'type', MEAN_SQUARED_ERROR)
set_default_value(output_feature[LOSS], 'weight', 1)
set_default_value(output_feature, 'reduce_input', None)
set_default_value(output_feature, 'reduce_dependencies', None)
set_default_value(output_feature, 'decoder', 'projector')
set_default_value(output_feature, 'dependencies', [])
decoder_registry = {
'projector': Projector,
'null': Projector,
'none': Projector,
'None': Projector,
None: Projector
}