in example_zoo/tensorflow/models/ncf_main/official/recommendation/data_pipeline.py [0:0]
def __init__(self,
maximum_number_epochs, # type: int
num_users, # type: int
num_items, # type: int
user_map, # type: dict
item_map, # type: dict
train_pos_users, # type: np.ndarray
train_pos_items, # type: np.ndarray
train_batch_size, # type: int
batches_per_train_step, # type: int
num_train_negatives, # type: int
eval_pos_users, # type: np.ndarray
eval_pos_items, # type: np.ndarray
eval_batch_size, # type: int
batches_per_eval_step, # type: int
stream_files, # type: bool
deterministic=False # type: bool
):
# General constants
self._maximum_number_epochs = maximum_number_epochs
self._num_users = num_users
self._num_items = num_items
self.user_map = user_map
self.item_map = item_map
self._train_pos_users = train_pos_users
self._train_pos_items = train_pos_items
self.train_batch_size = train_batch_size
self._num_train_negatives = num_train_negatives
self._batches_per_train_step = batches_per_train_step
self._eval_pos_users = eval_pos_users
self._eval_pos_items = eval_pos_items
self.eval_batch_size = eval_batch_size
# Training
if self._train_pos_users.shape != self._train_pos_items.shape:
raise ValueError(
"User positives ({}) is different from item positives ({})".format(
self._train_pos_users.shape, self._train_pos_items.shape))
(self._train_pos_count,) = self._train_pos_users.shape
self._elements_in_epoch = (1 + num_train_negatives) * self._train_pos_count
self.train_batches_per_epoch = self._count_batches(
self._elements_in_epoch, train_batch_size, batches_per_train_step)
# Evaluation
if eval_batch_size % (1 + rconst.NUM_EVAL_NEGATIVES):
raise ValueError("Eval batch size {} is not divisible by {}".format(
eval_batch_size, 1 + rconst.NUM_EVAL_NEGATIVES))
self._eval_users_per_batch = int(
eval_batch_size // (1 + rconst.NUM_EVAL_NEGATIVES))
self._eval_elements_in_epoch = num_users * (1 + rconst.NUM_EVAL_NEGATIVES)
self.eval_batches_per_epoch = self._count_batches(
self._eval_elements_in_epoch, eval_batch_size, batches_per_eval_step)
# Intermediate artifacts
self._current_epoch_order = np.empty(shape=(0,))
self._shuffle_iterator = None
self._shuffle_with_forkpool = not stream_files
if stream_files:
self._shard_root = tempfile.mkdtemp(prefix="ncf_")
atexit.register(tf.gfile.DeleteRecursively, dirname=self._shard_root)
else:
self._shard_root = None
self._train_dataset = DatasetManager(
True, stream_files, self.train_batches_per_epoch, self._shard_root,
deterministic)
self._eval_dataset = DatasetManager(
False, stream_files, self.eval_batches_per_epoch, self._shard_root,
deterministic)
# Threading details
super(BaseDataConstructor, self).__init__()
self.daemon = True
self._stop_loop = False
self._fatal_exception = None
self.deterministic = deterministic