easy_rec/python/input/odps_rtp_input_v2.py (67 lines of code) (raw):
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import json
import logging
import tensorflow as tf
from easy_rec.python.input.odps_rtp_input import OdpsRTPInput
try:
import pai
import rtp_fg
except Exception:
pai = None
rtp_fg = None
class OdpsRTPInputV2(OdpsRTPInput):
"""RTPInput for parsing rtp fg new input format on odps.
Our new format(csv in table) of rtp output:
label0, item_id, ..., user_id, features
Where features is in default RTP-tensorflow format.
The features column and labels are specified by data_config.selected_cols,
columns are selected by names in the table
such as: clk,features, the last selected column is features, the first
selected columns are labels
"""
def __init__(self,
data_config,
feature_config,
input_path,
task_index=0,
task_num=1,
check_mode=False,
fg_json_path=None,
pipeline_config=None):
super(OdpsRTPInputV2,
self).__init__(data_config, feature_config, input_path, task_index,
task_num, check_mode, pipeline_config)
if fg_json_path.startswith('!'):
fg_json_path = fg_json_path[1:]
self._fg_config_path = fg_json_path
logging.info('fg config path: {}'.format(self._fg_config_path))
if self._fg_config_path is None:
raise ValueError('fg_json_path is not set')
with tf.gfile.GFile(self._fg_config_path, 'r') as f:
self._fg_config = json.load(f)
def _parse_table(self, *fields):
self.check_rtp()
fields = list(fields)
labels = fields[:-1]
# assume that the last field is the generated feature column
features = rtp_fg.parse_genreated_fg(self._fg_config, fields[-1])
field_keys = [x for x in self._input_fields if x not in self._label_fields]
for feature_key in features:
if feature_key not in field_keys or feature_key not in self._effective_fields:
del features[feature_key]
inputs = {x: features[x] for x in features.keys()}
for x in range(len(self._label_fields)):
inputs[self._label_fields[x]] = labels[x]
return inputs
def create_placeholders(self, *args, **kwargs):
"""Create serving placeholders with rtp_fg."""
self.check_rtp()
self._mode = tf.estimator.ModeKeys.PREDICT
inputs_placeholder = tf.placeholder(tf.string, [None], name='features')
print('[OdpsRTPInputV2] building placeholders.')
print('[OdpsRTPInputV2] fg_config: {}'.format(self._fg_config))
features = rtp_fg.parse_genreated_fg(self._fg_config, inputs_placeholder)
print('[OdpsRTPInputV2] built features: {}'.format(features.keys()))
features = self._preprocess(features)
print('[OdpsRTPInputV2] processed features: {}'.format(features.keys()))
return {'features': inputs_placeholder}, features['feature']
def create_multi_placeholders(self, *args, **kwargs):
"""Create serving multi-placeholders with rtp_fg."""
raise NotImplementedError(
'create_multi_placeholders is not supported for OdpsRTPInputV2')
def check_rtp(self):
if rtp_fg is None:
raise NotImplementedError(
'OdpsRTPInputV2 cannot run without rtp_fg, which is not installed')
def _pre_build(self, mode, params):
try:
# Prevent TF from replacing the shape tensor to a constant tensor. This will
# cause the batch size being fixed. And RTP will be not able to recognize
# the input shape.
tf.get_default_graph().set_shape_optimize(False)
except AttributeError as e:
logging.warning('failed to disable shape optimization:', e)