torchbenchmark/models/dlrm/data_loader_terabyte.py (280 lines of code) (raw):

# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from __future__ import absolute_import, division, print_function, unicode_literals import os import numpy as np from torch.utils.data import Dataset import torch import time import math from tqdm import tqdm import argparse class DataLoader: """ DataLoader dedicated for the Criteo Terabyte Click Logs dataset """ def __init__( self, data_filename, data_directory, days, batch_size, max_ind_range=-1, split="train", drop_last_batch=False ): self.data_filename = data_filename self.data_directory = data_directory self.days = days self.batch_size = batch_size self.max_ind_range = max_ind_range total_file = os.path.join( data_directory, data_filename + "_day_count.npz" ) with np.load(total_file) as data: total_per_file = data["total_per_file"][np.array(days)] self.length = sum(total_per_file) if split == "test" or split == "val": self.length = int(np.ceil(self.length / 2.)) self.split = split self.drop_last_batch = drop_last_batch def __iter__(self): return iter( _batch_generator( self.data_filename, self.data_directory, self.days, self.batch_size, self.split, self.drop_last_batch, self.max_ind_range ) ) def __len__(self): if self.drop_last_batch: return self.length // self.batch_size else: return math.ceil(self.length / self.batch_size) def _transform_features( x_int_batch, x_cat_batch, y_batch, max_ind_range, flag_input_torch_tensor=False ): if max_ind_range > 0: x_cat_batch = x_cat_batch % max_ind_range if flag_input_torch_tensor: x_int_batch = torch.log(x_int_batch.clone().detach().type(torch.float) + 1) x_cat_batch = x_cat_batch.clone().detach().type(torch.long) y_batch = y_batch.clone().detach().type(torch.float32).view(-1, 1) else: x_int_batch = torch.log(torch.tensor(x_int_batch, dtype=torch.float) + 1) x_cat_batch = torch.tensor(x_cat_batch, dtype=torch.long) y_batch = torch.tensor(y_batch, dtype=torch.float32).view(-1, 1) batch_size = x_cat_batch.shape[0] feature_count = x_cat_batch.shape[1] lS_o = torch.arange(batch_size).reshape(1, -1).repeat(feature_count, 1) return x_int_batch, lS_o, x_cat_batch.t(), y_batch.view(-1, 1) def _batch_generator( data_filename, data_directory, days, batch_size, split, drop_last, max_ind_range ): previous_file = None for day in days: filepath = os.path.join( data_directory, data_filename + "_{}_reordered.npz".format(day) ) # print('Loading file: ', filepath) with np.load(filepath) as data: x_int = data["X_int"] x_cat = data["X_cat"] y = data["y"] samples_in_file = y.shape[0] batch_start_idx = 0 if split == "test" or split == "val": length = int(np.ceil(samples_in_file / 2.)) if split == "test": samples_in_file = length elif split == "val": batch_start_idx = samples_in_file - length while batch_start_idx < samples_in_file - batch_size: missing_samples = batch_size if previous_file is not None: missing_samples -= previous_file['y'].shape[0] current_slice = slice(batch_start_idx, batch_start_idx + missing_samples) x_int_batch = x_int[current_slice] x_cat_batch = x_cat[current_slice] y_batch = y[current_slice] if previous_file is not None: x_int_batch = np.concatenate( [previous_file['x_int'], x_int_batch], axis=0 ) x_cat_batch = np.concatenate( [previous_file['x_cat'], x_cat_batch], axis=0 ) y_batch = np.concatenate([previous_file['y'], y_batch], axis=0) previous_file = None if x_int_batch.shape[0] != batch_size: raise ValueError('should not happen') yield _transform_features(x_int_batch, x_cat_batch, y_batch, max_ind_range) batch_start_idx += missing_samples if batch_start_idx != samples_in_file: current_slice = slice(batch_start_idx, samples_in_file) if previous_file is not None: previous_file = { 'x_int' : np.concatenate( [previous_file['x_int'], x_int[current_slice]], axis=0 ), 'x_cat' : np.concatenate( [previous_file['x_cat'], x_cat[current_slice]], axis=0 ), 'y' : np.concatenate([previous_file['y'], y[current_slice]], axis=0) } else: previous_file = { 'x_int' : x_int[current_slice], 'x_cat' : x_cat[current_slice], 'y' : y[current_slice] } if not drop_last: yield _transform_features( previous_file['x_int'], previous_file['x_cat'], previous_file['y'], max_ind_range ) def _test(): generator = _batch_generator( data_filename='day', data_directory='/input', days=range(23), split="train", batch_size=2048 ) t1 = time.time() for x_int, lS_o, x_cat, y in generator: t2 = time.time() time_diff = t2 - t1 t1 = t2 print( "time {} x_int.shape: {} lS_o.shape: {} x_cat.shape: {} y.shape: {}".format( time_diff, x_int.shape, lS_o.shape, x_cat.shape, y.shape ) ) class CriteoBinDataset(Dataset): """Binary version of criteo dataset.""" def __init__(self, data_file, counts_file, batch_size=1, max_ind_range=-1, bytes_per_feature=4): # dataset self.tar_fea = 1 # single target self.den_fea = 13 # 13 dense features self.spa_fea = 26 # 26 sparse features self.tad_fea = self.tar_fea + self.den_fea self.tot_fea = self.tad_fea + self.spa_fea self.batch_size = batch_size self.max_ind_range = max_ind_range self.bytes_per_entry = (bytes_per_feature * self.tot_fea * batch_size) self.num_entries = math.ceil(os.path.getsize(data_file) / self.bytes_per_entry) print('data file:', data_file, 'number of batches:', self.num_entries) self.file = open(data_file, 'rb') with np.load(counts_file) as data: self.counts = data["counts"] # hardcoded for now self.m_den = 13 def __len__(self): return self.num_entries def __getitem__(self, idx): self.file.seek(idx * self.bytes_per_entry, 0) raw_data = self.file.read(self.bytes_per_entry) array = np.frombuffer(raw_data, dtype=np.int32) tensor = torch.from_numpy(array).view((-1, self.tot_fea)) return _transform_features(x_int_batch=tensor[:, 1:14], x_cat_batch=tensor[:, 14:], y_batch=tensor[:, 0], max_ind_range=self.max_ind_range, flag_input_torch_tensor=True) def numpy_to_binary(input_files, output_file_path, split='train'): """Convert the data to a binary format to be read with CriteoBinDataset.""" # WARNING - both categorical and numerical data must fit into int32 for # the following code to work correctly with open(output_file_path, 'wb') as output_file: if split == 'train': for input_file in input_files: print('Processing file: ', input_file) np_data = np.load(input_file) np_data = np.concatenate([np_data['y'].reshape(-1, 1), np_data['X_int'], np_data['X_cat']], axis=1) np_data = np_data.astype(np.int32) output_file.write(np_data.tobytes()) else: assert len(input_files) == 1 np_data = np.load(input_files[0]) np_data = np.concatenate([np_data['y'].reshape(-1, 1), np_data['X_int'], np_data['X_cat']], axis=1) np_data = np_data.astype(np.int32) samples_in_file = np_data.shape[0] midpoint = int(np.ceil(samples_in_file / 2.)) if split == "test": begin = 0 end = midpoint elif split == "val": begin = midpoint end = samples_in_file else: raise ValueError('Unknown split value: ', split) output_file.write(np_data[begin:end].tobytes()) def _preprocess(args): train_files = ['{}_{}_reordered.npz'.format(args.input_data_prefix, day) for day in range(0, 23)] test_valid_file = args.input_data_prefix + '_23_reordered.npz' os.makedirs(args.output_directory, exist_ok=True) for split in ['train', 'val', 'test']: print('Running preprocessing for split =', split) output_file = os.path.join(args.output_directory, '{}_data.bin'.format(split)) input_files = train_files if split == 'train' else [test_valid_file] numpy_to_binary(input_files=input_files, output_file_path=output_file, split=split) def _test_bin(): parser = argparse.ArgumentParser() parser.add_argument('--output_directory', required=True) parser.add_argument('--input_data_prefix', required=True) parser.add_argument('--split', choices=['train', 'test', 'val'], required=True) args = parser.parse_args() # _preprocess(args) binary_data_file = os.path.join(args.output_directory, '{}_data.bin'.format(args.split)) counts_file = os.path.join(args.output_directory, 'day_fea_count.npz') dataset_binary = CriteoBinDataset(data_file=binary_data_file, counts_file=counts_file, batch_size=2048,) from dlrm_data_pytorch import CriteoDataset, collate_wrapper_criteo binary_loader = torch.utils.data.DataLoader( dataset_binary, batch_size=None, shuffle=False, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, ) original_dataset = CriteoDataset( dataset='terabyte', max_ind_range=10 * 1000 * 1000, sub_sample_rate=1, randomize=True, split=args.split, raw_path=args.input_data_prefix, pro_data='dummy_string', memory_map=True ) original_loader = torch.utils.data.DataLoader( original_dataset, batch_size=2048, shuffle=False, num_workers=0, collate_fn=collate_wrapper_criteo, pin_memory=False, drop_last=False, ) assert len(dataset_binary) == len(original_loader) for i, (old_batch, new_batch) in tqdm(enumerate(zip(original_loader, binary_loader)), total=len(dataset_binary)): for j in range(len(new_batch)): if not np.array_equal(old_batch[j], new_batch[j]): raise ValueError('FAILED: Datasets not equal') if i > len(dataset_binary): break print('PASSED') if __name__ == '__main__': _test() _test_bin