torchbenchmark/models/dlrm/data_utils.py (875 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # # Description: generate inputs and targets for the DLRM benchmark # # Utility function(s) to download and pre-process public data sets # - Criteo Kaggle Display Advertising Challenge Dataset # https://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset # - Criteo Terabyte Dataset # https://labs.criteo.com/2013/12/download-terabyte-click-logs # # After downloading dataset, run: # getCriteoAdData( # datafile="<path-to-train.txt>", # o_filename=kaggleAdDisplayChallenge_processed.npz, # max_ind_range=-1, # sub_sample_rate=0.0, # days=7, # data_split='train', # randomize='total', # criteo_kaggle=True, # memory_map=False # ) # getCriteoAdData( # datafile="<path-to-day_{0,...,23}>", # o_filename=terabyte_processed.npz, # max_ind_range=-1, # sub_sample_rate=0.0, # days=24, # data_split='train', # randomize='total', # criteo_kaggle=False, # memory_map=False # ) from __future__ import absolute_import, division, print_function, unicode_literals import sys # import os from os import path # import io # from io import StringIO # import collections as coll import numpy as np def convertUStringToDistinctIntsDict(mat, convertDicts, counts): # Converts matrix of unicode strings into distinct integers. # # Inputs: # mat (np.array): array of unicode strings to convert # convertDicts (list): dictionary for each column # counts (list): number of different categories in each column # # Outputs: # out (np.array): array of output integers # convertDicts (list): dictionary for each column # counts (list): number of different categories in each column # check if convertDicts and counts match correct length of mat if len(convertDicts) != mat.shape[1] or len(counts) != mat.shape[1]: print("Length of convertDicts or counts does not match input shape") print("Generating convertDicts and counts...") convertDicts = [{} for _ in range(mat.shape[1])] counts = [0 for _ in range(mat.shape[1])] # initialize output out = np.zeros(mat.shape) for j in range(mat.shape[1]): for i in range(mat.shape[0]): # add to convertDict and increment count if mat[i, j] not in convertDicts[j]: convertDicts[j][mat[i, j]] = counts[j] counts[j] += 1 out[i, j] = convertDicts[j][mat[i, j]] return out, convertDicts, counts def convertUStringToDistinctIntsUnique(mat, mat_uni, counts): # mat is an array of 0,...,# samples, with each being 26 categorical features # check if mat_unique and counts match correct length of mat if len(mat_uni) != mat.shape[1] or len(counts) != mat.shape[1]: print("Length of mat_unique or counts does not match input shape") print("Generating mat_unique and counts...") mat_uni = [np.array([]) for _ in range(mat.shape[1])] counts = [0 for _ in range(mat.shape[1])] # initialize output out = np.zeros(mat.shape) ind_map = [np.array([]) for _ in range(mat.shape[1])] # find out and assign unique ids to features for j in range(mat.shape[1]): m = mat_uni[j].size mat_concat = np.concatenate((mat_uni[j], mat[:, j])) mat_uni[j], ind_map[j] = np.unique(mat_concat, return_inverse=True) out[:, j] = ind_map[j][m:] counts[j] = mat_uni[j].size return out, mat_uni, counts def processCriteoAdData(d_path, d_file, npzfile, split, convertDicts, pre_comp_counts): # Process Kaggle Display Advertising Challenge or Terabyte Dataset # by converting unicode strings in X_cat to integers and # converting negative integer values in X_int. # # Loads data in the form "{kaggle|terabyte}_day_i.npz" where i is the day. # # Inputs: # d_path (str): path for {kaggle|terabyte}_day_i.npz files # split (int): total number of splits in the dataset (typically 7 or 24) # process data if not all files exist for i in range(split): filename_i = npzfile + "_{0}_processed.npz".format(i) if path.exists(filename_i): print("Using existing " + filename_i, end="\r") else: with np.load(npzfile + "_{0}.npz".format(i)) as data: # categorical features ''' # Approach 1a: using empty dictionaries X_cat, convertDicts, counts = convertUStringToDistinctIntsDict( data["X_cat"], convertDicts, counts ) ''' ''' # Approach 1b: using empty np.unique X_cat, convertDicts, counts = convertUStringToDistinctIntsUnique( data["X_cat"], convertDicts, counts ) ''' # Approach 2a: using pre-computed dictionaries X_cat_t = np.zeros(data["X_cat_t"].shape) for j in range(26): for k, x in enumerate(data["X_cat_t"][j, :]): X_cat_t[j, k] = convertDicts[j][x] # continuous features X_int = data["X_int"] X_int[X_int < 0] = 0 # targets y = data["y"] np.savez_compressed( filename_i, # X_cat = X_cat, X_cat=np.transpose(X_cat_t), # transpose of the data X_int=X_int, y=y, ) print("Processed " + filename_i, end="\r") print("") # sanity check (applicable only if counts have been pre-computed & are re-computed) # for j in range(26): # if pre_comp_counts[j] != counts[j]: # sys.exit("ERROR: Sanity check on counts has failed") # print("\nSanity check on counts passed") return def concatCriteoAdData( d_path, d_file, npzfile, trafile, days, data_split, randomize, total_per_file, total_count, memory_map, o_filename ): # Concatenates different days and saves the result. # # Inputs: # days (int): total number of days in the dataset (typically 7 or 24) # d_path (str): path for {kaggle|terabyte}_day_i.npz files # o_filename (str): output file name # # Output: # o_file (str): output file path if memory_map: # dataset break up per fea # tar_fea = 1 # single target den_fea = 13 # 13 dense features spa_fea = 26 # 26 sparse features # tad_fea = tar_fea + den_fea # tot_fea = tad_fea + spa_fea # create offset per file offset_per_file = np.array([0] + [x for x in total_per_file]) for i in range(days): offset_per_file[i + 1] += offset_per_file[i] ''' # Approach 1, 2 and 3 use indices, while Approach 4 does not use them # create indices indices = np.arange(total_count) if data_split == "none": if randomize == "total": indices = np.random.permutation(indices) else: indices = np.array_split(indices, offset_per_file[1:-1]) # randomize train data (per day) if randomize == "day": # or randomize == "total": for i in range(len(indices) - 1): indices[i] = np.random.permutation(indices[i]) print("Randomized indices per day ...") train_indices = np.concatenate(indices[:-1]) test_indices = indices[-1] # randomize train data (across days) if randomize == "total": train_indices = np.random.permutation(train_indices) print("Randomized indices across days ...") indices = np.concatenate((train_indices, test_indices)) # no reordering # indices = np.arange(total_count) ''' ''' # Approach 1: simple and slow (no grouping is used) # check if data already exists recreate_flag = False for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) if path.exists(filename_j): print("Using existing " + filename_j) else: recreate_flag = True # load, reorder and concatenate data (memmap all reordered files per feature) if recreate_flag: # init reordered files (.npy appended automatically) z = np.zeros((total_count)) for j in range(tot_fea): filename_j = trafile + "_{0}_reordered".format(j) np.save(filename_j, z) print("Creating " + filename_j) for i in range(days): filename_i = d_path + npzfile + "_{0}_processed.npz".format(i) with np.load(filename_i) as data: X_cat_t = np.transpose(data["X_cat"]) X_int_t = np.transpose(data["X_int"]) y = data["y"] size = len(y) # sanity check if total_per_file[i] != size: sys.exit("ERROR: sanity check on number of samples failed") # setup start and end ranges start = offset_per_file[i] end = offset_per_file[i + 1] # print(filename_i) # print("start=" + str(start) + " end=" + str(end) # + " diff=" + str(end - start) + "=" + str(total_per_file[i])) for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) fj = np.load(filename_j, mmap_mode='r+') if j < tar_fea: fj[indices[start:end]] = y elif tar_fea <= j and j < tad_fea: fj[indices[start:end]] = X_int_t[j - tar_fea, :] else: fj[indices[start:end]] = X_cat_t[j - tad_fea, :] del fj else: print("Reordered fea files already exist, skipping ...") # check if data already exists recreate_flag = False for i in range(days): filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) if path.exists(filename_i): print("Using existing " + filename_i) else: recreate_flag = True # split reordered data by files (memmap all reordered files per feature) # on the day boundary del the file object and memmap again if recreate_flag: for i in range(days): filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) size = total_per_file[i] X_int_t = np.zeros((den_fea, size)) X_cat_t = np.zeros((spa_fea, size)) # setup start and end ranges start = offset_per_file[i] end = offset_per_file[i + 1] print("Creating " + filename_i) # print("start=" + str(start) + " end=" + str(end) # + " diff=" + str(end - start) + "=" + str(total_per_file[i])) for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) fj = np.load(filename_j, mmap_mode='r') if j < tar_fea: y = fj[start:end] elif tar_fea <= j and j < tad_fea: X_int_t[j - tar_fea, :] = fj[start:end] else: X_cat_t[j - tad_fea, :] = fj[start:end] del fj np.savez_compressed( filename_i, X_cat=np.transpose(X_cat_t), # transpose of the data X_int=np.transpose(X_int_t), # transpose of the data y=y, ) else: print("Reordered day files already exist, skipping ...") ''' ''' # Approach 2: group days # check if data already exists recreate_flag = False for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) if path.exists(filename_j): print("Using existing " + filename_j) else: recreate_flag = True # load, reorder and concatenate data (memmap all reordered files per feature) if recreate_flag: # init reordered files (.npy appended automatically) z = np.zeros((total_count)) for j in range(tot_fea): filename_j = trafile + "_{0}_reordered".format(j) np.save(filename_j, z) print("Creating " + filename_j) group_day = 3 # e.g. 8, 4 or 3 group_num = days // group_day file_group = [i*group_day for i in range(group_num)] + [days] for ii in range(group_num): # for last may be group_size != group_num, therefore reset it below group_size = file_group[ii + 1] - file_group[ii] X_cat_t = [0]*group_size X_int_t = [0]*group_size y = [0]*group_size start = [0]*group_size end = [0]*group_size for ig in range(group_size): i = file_group[ii] + ig filename_i = d_path + npzfile + "_{0}_processed.npz".format(i) # setup start and end ranges start[ig] = offset_per_file[i] end[ig] = offset_per_file[i + 1] # print(filename_i) # load a group of files with np.load(filename_i) as data: X_cat_t[ig] = np.transpose(data["X_cat"]) X_int_t[ig] = np.transpose(data["X_int"]) y[ig] = data["y"] # sanity check if total_per_file[i] != len(y[ig]): sys.exit("ERROR: sanity check on number of samples failed") # print("start=" + str(start) + " end=" + str(end) # + " diff=" + str(end[ig]-start[ig]) + "=" + str(total_per_file[i])) for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) fj = np.load(filename_j, mmap_mode='r+') for ig in range(group_size): if j < tar_fea: fj[indices[start[ig]:end[ig]]] = y[ig] elif tar_fea <= j and j < tad_fea: fj[indices[start[ig]:end[ig]]] = X_int_t[ig][j - tar_fea, :] else: fj[indices[start[ig]:end[ig]]] = X_cat_t[ig][j - tad_fea, :] del fj else: print("Reordered fea files already exist, skipping ...") # check if data already exists recreate_flag = False for i in range(days): filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) if path.exists(filename_i): print("Using existing " + filename_i) else: recreate_flag = True # split reordered data by files (memmap all reordered files per feature) # on the day boundary del the file object and memmap again if recreate_flag: for ii in range(group_num): # for last may be group_size != group_num, therefore reset it below group_size = file_group[ii + 1] - file_group[ii] X_cat_t= []; X_int_t = [] for ig in range(group_size): i = file_group[ii] + ig X_int_t.append(np.zeros((den_fea, total_per_file[i]))) X_cat_t.append(np.zeros((spa_fea, total_per_file[i]))) y = [0]*group_size start = [0]*group_size end = [0]*group_size for j in range(tot_fea): filename_j = trafile + "_{0}_reordered.npy".format(j) fj = np.load(filename_j, mmap_mode='r') # load a group of files for ig in range(group_size): i = file_group[ii] + ig # setup start and end ranges start[ig] = offset_per_file[i] end[ig] = offset_per_file[i + 1] # load data for the group of files if j < tar_fea: y[ig] = fj[start[ig]:end[ig]] elif tar_fea <= j and j < tad_fea: X_int_t[ig][j - tar_fea, :] = fj[start[ig]:end[ig]] else: X_cat_t[ig][j - tad_fea, :] = fj[start[ig]:end[ig]] del fj for ig in range(group_size): i = file_group[ii] + ig filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) print("Creating " + filename_i) np.savez_compressed( filename_i, X_cat=np.transpose(X_cat_t[ig]), # transpose of the data X_int=np.transpose(X_int_t[ig]), # transpose of the data y=y[ig], ) else: print("Reordered day files already exist, skipping ...") ''' ''' # Approach 3: group features # check if data already exists group_fea = 5 # e.g. 8, 5 or 4 group_num = tot_fea // group_fea if tot_fea % group_fea != 0: # sanity check sys.exit("ERROR: the group_fea must divided tot_fea evenly.") recreate_flag = False for jn in range(group_num): filename_j = trafile + "_{0}_reordered{1}.npy".format( jn, group_fea ) if path.exists(filename_j): print("Using existing " + filename_j) else: recreate_flag = True # load, reorder and concatenate data (memmap all reordered files per feature) if recreate_flag: # init reordered files (.npy appended automatically) z = np.zeros((group_fea, total_count)) for jn in range(group_num): filename_j = trafile + "_{0}_reordered{1}".format( jn, group_fea ) np.save(filename_j, z) print("Creating " + filename_j) for i in range(days): filename_i = d_path + npzfile + "_{0}_processed.npz".format(i) with np.load(filename_i) as data: X_cat_t = np.transpose(data["X_cat"]) X_int_t = np.transpose(data["X_int"]) y = data["y"] size = len(y) # sanity check if total_per_file[i] != size: sys.exit("ERROR: sanity check on number of samples failed") # setup start and end ranges start = offset_per_file[i] end = offset_per_file[i + 1] # print(filename_i) # print("start=" + str(start) + " end=" + str(end) # + " diff=" + str(end - start) + "=" + str(total_per_file[i])) for jn in range(group_num): filename_j = trafile + "_{0}_reordered{1}.npy".format( jn, group_fea ) fj = np.load(filename_j, mmap_mode='r+') for jg in range(group_fea): j = jn * group_fea + jg # print("j=" + str(j) + " jn=" + str(jn) + " jg=" + str(jg)) if j < tar_fea: fj[jg, indices[start:end]] = y elif tar_fea <= j and j < tad_fea: fj[jg, indices[start:end]] = X_int_t[j - tar_fea, :] else: fj[jg, indices[start:end]] = X_cat_t[j - tad_fea, :] del fj else: print("Reordered fea files already exist, skipping ...") # check if data already exists recreate_flag = False for i in range(days): filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) if path.exists(filename_i): print("Using existing" + filename_i) else: recreate_flag = True # split reordered data by files (memmap all reordered files per feature) # on the day boundary del the file object and memmap again if recreate_flag: for i in range(days): filename_i = d_path + npzfile + "_{0}_reordered.npz".format(i) size = total_per_file[i] X_int_t = np.zeros((den_fea, size)) X_cat_t = np.zeros((spa_fea, size)) # setup start and end ranges start = offset_per_file[i] end = offset_per_file[i + 1] print("Creating " + filename_i) # print("start=" + str(start) + " end=" + str(end) # + " diff=" + str(end - start) + "=" + str(total_per_file[i])) for jn in range(group_num): filename_j = trafile + "_{0}_reordered{1}.npy".format( jn, group_fea ) fj = np.load(filename_j, mmap_mode='r') for jg in range(group_fea): j = jn * group_fea + jg # print("j=" + str(j) + " jn=" + str(jn) + " jg=" + str(jg)) if j < tar_fea: y = fj[jg, start:end] elif tar_fea <= j and j < tad_fea: X_int_t[j - tar_fea, :] = fj[jg, start:end] else: X_cat_t[j - tad_fea, :] = fj[jg, start:end] del fj np.savez_compressed( filename_i, X_cat=np.transpose(X_cat_t), # transpose of the data X_int=np.transpose(X_int_t), # transpose of the data y=y, ) else: print("Reordered day files already exist, skipping ...") ''' # Approach 4: Fisher-Yates-Rao (FYR) shuffle algorithm # 1st pass of FYR shuffle # check if data already exists recreate_flag = False for j in range(days): filename_j_y = npzfile + "_{0}_intermediate_y.npy".format(j) filename_j_d = npzfile + "_{0}_intermediate_d.npy".format(j) filename_j_s = npzfile + "_{0}_intermediate_s.npy".format(j) if ( path.exists(filename_j_y) and path.exists(filename_j_d) and path.exists(filename_j_s) ): print( "Using existing\n" + filename_j_y + "\n" + filename_j_d + "\n" + filename_j_s ) else: recreate_flag = True # reorder across buckets using sampling if recreate_flag: # init intermediate files (.npy appended automatically) for j in range(days): filename_j_y = npzfile + "_{0}_intermediate_y".format(j) filename_j_d = npzfile + "_{0}_intermediate_d".format(j) filename_j_s = npzfile + "_{0}_intermediate_s".format(j) np.save(filename_j_y, np.zeros((total_per_file[j]))) np.save(filename_j_d, np.zeros((total_per_file[j], den_fea))) np.save(filename_j_s, np.zeros((total_per_file[j], spa_fea))) # start processing files total_counter = [0] * days for i in range(days): filename_i = npzfile + "_{0}_processed.npz".format(i) with np.load(filename_i) as data: X_cat = data["X_cat"] X_int = data["X_int"] y = data["y"] size = len(y) # sanity check if total_per_file[i] != size: sys.exit("ERROR: sanity check on number of samples failed") # debug prints print("Reordering (1st pass) " + filename_i) # create buckets using sampling of random ints # from (discrete) uniform distribution buckets = [] for _j in range(days): buckets.append([]) counter = [0] * days days_to_sample = days if data_split == "none" else days - 1 if randomize == "total": rand_u = np.random.randint(low=0, high=days_to_sample, size=size) for k in range(size): # sample and make sure elements per buckets do not overflow if data_split == "none" or i < days - 1: # choose bucket p = rand_u[k] # retry of the bucket is full while total_counter[p] + counter[p] >= total_per_file[p]: p = np.random.randint(low=0, high=days_to_sample) else: # preserve the last day/bucket if needed p = i buckets[p].append(k) counter[p] += 1 else: # randomize is day or none for k in range(size): # do not sample, preserve the data in this bucket p = i buckets[p].append(k) counter[p] += 1 # sanity check if np.sum(counter) != size: sys.exit("ERROR: sanity check on number of samples failed") # debug prints # print(counter) # print(str(np.sum(counter)) + " = " + str(size)) # print([len(x) for x in buckets]) # print(total_counter) # partially feel the buckets for j in range(days): filename_j_y = npzfile + "_{0}_intermediate_y.npy".format(j) filename_j_d = npzfile + "_{0}_intermediate_d.npy".format(j) filename_j_s = npzfile + "_{0}_intermediate_s.npy".format(j) start = total_counter[j] end = total_counter[j] + counter[j] # target buckets fj_y = np.load(filename_j_y, mmap_mode='r+') # print("start=" + str(start) + " end=" + str(end) # + " end - start=" + str(end - start) + " " # + str(fj_y[start:end].shape) + " " # + str(len(buckets[j]))) fj_y[start:end] = y[buckets[j]] del fj_y # dense buckets fj_d = np.load(filename_j_d, mmap_mode='r+') # print("start=" + str(start) + " end=" + str(end) # + " end - start=" + str(end - start) + " " # + str(fj_d[start:end, :].shape) + " " # + str(len(buckets[j]))) fj_d[start:end, :] = X_int[buckets[j], :] del fj_d # sparse buckets fj_s = np.load(filename_j_s, mmap_mode='r+') # print("start=" + str(start) + " end=" + str(end) # + " end - start=" + str(end - start) + " " # + str(fj_s[start:end, :].shape) + " " # + str(len(buckets[j]))) fj_s[start:end, :] = X_cat[buckets[j], :] del fj_s # update counters for next step total_counter[j] += counter[j] # 2nd pass of FYR shuffle # check if data already exists for j in range(days): filename_j = npzfile + "_{0}_reordered.npz".format(j) if path.exists(filename_j): print("Using existing " + filename_j) else: recreate_flag = True # reorder within buckets if recreate_flag: for j in range(days): filename_j_y = npzfile + "_{0}_intermediate_y.npy".format(j) filename_j_d = npzfile + "_{0}_intermediate_d.npy".format(j) filename_j_s = npzfile + "_{0}_intermediate_s.npy".format(j) fj_y = np.load(filename_j_y) fj_d = np.load(filename_j_d) fj_s = np.load(filename_j_s) indices = range(total_per_file[j]) if randomize == "day" or randomize == "total": if data_split == "none" or j < days - 1: indices = np.random.permutation(range(total_per_file[j])) filename_r = npzfile + "_{0}_reordered.npz".format(j) print("Reordering (2nd pass) " + filename_r) np.savez_compressed( filename_r, X_cat=fj_s[indices, :], X_int=fj_d[indices, :], y=fj_y[indices], ) ''' # sanity check (under no reordering norms should be zero) for i in range(days): filename_i_o = npzfile + "_{0}_processed.npz".format(i) print(filename_i_o) with np.load(filename_i_o) as data_original: X_cat_o = data_original["X_cat"] X_int_o = data_original["X_int"] y_o = data_original["y"] filename_i_r = npzfile + "_{0}_reordered.npz".format(i) print(filename_i_r) with np.load(filename_i_r) as data_reordered: X_cat_r = data_reordered["X_cat"] X_int_r = data_reordered["X_int"] y_r = data_reordered["y"] print(np.linalg.norm(y_o - y_r)) print(np.linalg.norm(X_int_o - X_int_r)) print(np.linalg.norm(X_cat_o - X_cat_r)) ''' else: print("Concatenating multiple days into %s.npz file" % str(d_path + o_filename)) # load and concatenate data for i in range(days): filename_i = npzfile + "_{0}_processed.npz".format(i) with np.load(filename_i) as data: if i == 0: X_cat = data["X_cat"] X_int = data["X_int"] y = data["y"] else: X_cat = np.concatenate((X_cat, data["X_cat"])) X_int = np.concatenate((X_int, data["X_int"])) y = np.concatenate((y, data["y"])) print("Loaded day:", i, "y = 1:", len(y[y == 1]), "y = 0:", len(y[y == 0])) with np.load(d_path + d_file + "_fea_count.npz") as data: counts = data["counts"] print("Loaded counts!") np.savez_compressed( d_path + o_filename + ".npz", X_cat=X_cat, X_int=X_int, y=y, counts=counts, ) return d_path + o_filename + ".npz" def transformCriteoAdData(X_cat, X_int, y, days, data_split, randomize, total_per_file): # Transforms Criteo Kaggle or terabyte data by applying log transformation # on dense features and converting everything to appropriate tensors. # # Inputs: # X_cat (ndarray): array of integers corresponding to preprocessed # categorical features # X_int (ndarray): array of integers corresponding to dense features # y (ndarray): array of bool corresponding to labels # data_split(str): flag for splitting dataset into training/validation/test # sets # randomize (str): determines randomization scheme # "none": no randomization # "day": randomizes each day"s data (only works if split = True) # "total": randomizes total dataset # # Outputs: # if split: # X_cat_train (tensor): sparse features for training set # X_int_train (tensor): dense features for training set # y_train (tensor): labels for training set # X_cat_val (tensor): sparse features for validation set # X_int_val (tensor): dense features for validation set # y_val (tensor): labels for validation set # X_cat_test (tensor): sparse features for test set # X_int_test (tensor): dense features for test set # y_test (tensor): labels for test set # else: # X_cat (tensor): sparse features # X_int (tensor): dense features # y (tensor): label # define initial set of indices indices = np.arange(len(y)) # create offset per file offset_per_file = np.array([0] + [x for x in total_per_file]) for i in range(days): offset_per_file[i + 1] += offset_per_file[i] # split dataset if data_split == 'train': indices = np.array_split(indices, offset_per_file[1:-1]) # randomize train data (per day) if randomize == "day": # or randomize == "total": for i in range(len(indices) - 1): indices[i] = np.random.permutation(indices[i]) print("Randomized indices per day ...") train_indices = np.concatenate(indices[:-1]) test_indices = indices[-1] test_indices, val_indices = np.array_split(test_indices, 2) print("Defined training and testing indices...") # randomize train data (across days) if randomize == "total": train_indices = np.random.permutation(train_indices) print("Randomized indices across days ...") # indices = np.concatenate((train_indices, test_indices)) # create training, validation, and test sets X_cat_train = X_cat[train_indices] X_int_train = X_int[train_indices] y_train = y[train_indices] X_cat_val = X_cat[val_indices] X_int_val = X_int[val_indices] y_val = y[val_indices] X_cat_test = X_cat[test_indices] X_int_test = X_int[test_indices] y_test = y[test_indices] print("Split data according to indices...") X_cat_train = X_cat_train.astype(np.long) X_int_train = np.log(X_int_train.astype(np.float32) + 1) y_train = y_train.astype(np.float32) X_cat_val = X_cat_val.astype(np.long) X_int_val = np.log(X_int_val.astype(np.float32) + 1) y_val = y_val.astype(np.float32) X_cat_test = X_cat_test.astype(np.long) X_int_test = np.log(X_int_test.astype(np.float32) + 1) y_test = y_test.astype(np.float32) print("Converted to tensors...done!") return ( X_cat_train, X_int_train, y_train, X_cat_val, X_int_val, y_val, X_cat_test, X_int_test, y_test, ) else: # randomize data if randomize == "total": indices = np.random.permutation(indices) print("Randomized indices...") X_cat = X_cat[indices].astype(np.long) X_int = np.log(X_int[indices].astype(np.float32) + 1) y = y[indices].astype(np.float32) print("Converted to tensors...done!") return (X_cat, X_int, y, [], [], [], [], [], []) def getCriteoAdData( datafile, o_filename, max_ind_range=-1, sub_sample_rate=0.0, days=7, data_split='train', randomize='total', criteo_kaggle=True, memory_map=False ): # Passes through entire dataset and defines dictionaries for categorical # features and determines the number of total categories. # # Inputs: # datafile : path to downloaded raw data file # o_filename (str): saves results under o_filename if filename is not "" # # Output: # o_file (str): output file path #split the datafile into path and filename lstr = datafile.split("/") d_path = "/".join(lstr[0:-1]) + "/" d_file = lstr[-1].split(".")[0] if criteo_kaggle else lstr[-1] npzfile = d_path + ((d_file + "_day") if criteo_kaggle else d_file) trafile = d_path + ((d_file + "_fea") if criteo_kaggle else "fea") # count number of datapoints in training set total_file = d_path + d_file + "_day_count.npz" if path.exists(total_file): with np.load(total_file) as data: total_per_file = list(data["total_per_file"]) total_count = np.sum(total_per_file) print("Skipping counts per file (already exist)") else: total_count = 0 total_per_file = [] if criteo_kaggle: # WARNING: The raw data consists of a single train.txt file # Each line in the file is a sample, consisting of 13 continuous and # 26 categorical features (an extra space indicates that feature is # missing and will be interpreted as 0). if path.exists(datafile): print("Reading data from path=%s" % (datafile)) with open(str(datafile)) as f: for _ in f: total_count += 1 total_per_file.append(total_count) # reset total per file due to split num_data_per_split, extras = divmod(total_count, days) total_per_file = [num_data_per_split] * days for j in range(extras): total_per_file[j] += 1 # split into days (simplifies code later on) file_id = 0 boundary = total_per_file[file_id] nf = open(npzfile + "_" + str(file_id), "w") with open(str(datafile)) as f: for j, line in enumerate(f): if j == boundary: nf.close() file_id += 1 nf = open(npzfile + "_" + str(file_id), "w") boundary += total_per_file[file_id] nf.write(line) nf.close() else: sys.exit("ERROR: Criteo Kaggle Display Ad Challenge Dataset path is invalid; please download from https://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset") else: # WARNING: The raw data consist of day_0.gz,... ,day_23.gz text files # Each line in the file is a sample, consisting of 13 continuous and # 26 categorical features (an extra space indicates that feature is # missing and will be interpreted as 0). for i in range(days): datafile_i = datafile + "_" + str(i) # + ".gz" if path.exists(str(datafile_i)): print("Reading data from path=%s" % (str(datafile_i))) # file day_<number> total_per_file_count = 0 with open(str(datafile_i)) as f: for _ in f: total_per_file_count += 1 total_per_file.append(total_per_file_count) total_count += total_per_file_count else: sys.exit("ERROR: Criteo Terabyte Dataset path is invalid; please download from https://labs.criteo.com/2013/12/download-terabyte-click-logs") # process a file worth of data and reinitialize data # note that a file main contain a single or multiple splits def process_one_file( datfile, npzfile, split, num_data_in_split, ): with open(str(datfile)) as f: y = np.zeros(num_data_in_split, dtype="i4") # 4 byte int X_int = np.zeros((num_data_in_split, 13), dtype="i4") # 4 byte int X_cat = np.zeros((num_data_in_split, 26), dtype="i4") # 4 byte int if sub_sample_rate == 0.0: rand_u = 1.0 else: rand_u = np.random.uniform(low=0.0, high=1.0, size=num_data_in_split) i = 0 for k, line in enumerate(f): # process a line (data point) line = line.split('\t') # set missing values to zero for j in range(len(line)): if (line[j] == '') or (line[j] == '\n'): line[j] = '0' # sub-sample data by dropping zero targets, if needed target = np.int32(line[0]) if target == 0 and \ (rand_u if sub_sample_rate == 0.0 else rand_u[k]) < sub_sample_rate: continue y[i] = target X_int[i] = np.array(line[1:14], dtype=np.int32) if max_ind_range > 0: X_cat[i] = np.array( list(map(lambda x: int(x, 16) % max_ind_range, line[14:])), dtype=np.int32 ) else: X_cat[i] = np.array( list(map(lambda x: int(x, 16), line[14:])), dtype=np.int32 ) # count uniques for j in range(26): convertDicts[j][X_cat[i][j]] = 1 # debug prints print( "Load %d/%d Split: %d Label True: %d Stored: %d" % ( i, num_data_in_split, split, target, y[i], ), end="\r", ) i += 1 # store num_data_in_split samples or extras at the end of file # count uniques # X_cat_t = np.transpose(X_cat) # for j in range(26): # for x in X_cat_t[j,:]: # convertDicts[j][x] = 1 # store parsed filename_s = npzfile + "_{0}.npz".format(split) if path.exists(filename_s): print("\nSkip existing " + filename_s) else: np.savez_compressed( filename_s, X_int=X_int[0:i, :], # X_cat=X_cat[0:i, :], X_cat_t=np.transpose(X_cat[0:i, :]), # transpose of the data y=y[0:i], ) print("\nSaved " + npzfile + "_{0}.npz!".format(split)) return i # create all splits (reuse existing files if possible) recreate_flag = False convertDicts = [{} for _ in range(26)] # WARNING: to get reproducable sub-sampling results you must reset the seed below # np.random.seed(123) # in this case there is a single split in each day for i in range(days): datfile_i = npzfile + "_{0}".format(i) # + ".gz" npzfile_i = npzfile + "_{0}.npz".format(i) npzfile_p = npzfile + "_{0}_processed.npz".format(i) if path.exists(npzfile_i): print("Skip existing " + npzfile_i) elif path.exists(npzfile_p): print("Skip existing " + npzfile_p) else: recreate_flag = True total_per_file[i] = process_one_file( datfile_i, npzfile, i, total_per_file[i], ) # report and save total into a file total_count = np.sum(total_per_file) if not path.exists(total_file): np.savez_compressed(total_file, total_per_file=total_per_file) print("Total number of samples:", total_count) print("Divided into days/splits:\n", total_per_file) # dictionary files counts = np.zeros(26, dtype=np.int32) if recreate_flag: # create dictionaries for j in range(26): for i, x in enumerate(convertDicts[j]): convertDicts[j][x] = i dict_file_j = d_path + d_file + "_fea_dict_{0}.npz".format(j) if not path.exists(dict_file_j): np.savez_compressed( dict_file_j, unique=np.array(list(convertDicts[j]), dtype=np.int32) ) counts[j] = len(convertDicts[j]) # store (uniques and) counts count_file = d_path + d_file + "_fea_count.npz" if not path.exists(count_file): np.savez_compressed(count_file, counts=counts) else: # create dictionaries (from existing files) for j in range(26): with np.load(d_path + d_file + "_fea_dict_{0}.npz".format(j)) as data: unique = data["unique"] for i, x in enumerate(unique): convertDicts[j][x] = i # load (uniques and) counts with np.load(d_path + d_file + "_fea_count.npz") as data: counts = data["counts"] # process all splits processCriteoAdData(d_path, d_file, npzfile, days, convertDicts, counts) o_file = concatCriteoAdData( d_path, d_file, npzfile, trafile, days, data_split, randomize, total_per_file, total_count, memory_map, o_filename ) return o_file def loadDataset( dataset, max_ind_range, sub_sample_rate, randomize, data_split, raw_path="", pro_data="", memory_map=False ): # dataset if dataset == "kaggle": days = 7 o_filename = "kaggleAdDisplayChallenge_processed" elif dataset == "terabyte": days = 24 o_filename = "terabyte_processed" else: raise(ValueError("Data set option is not supported")) # split the datafile into path and filename lstr = raw_path.split("/") d_path = "/".join(lstr[0:-1]) + "/" d_file = lstr[-1].split(".")[0] if dataset == "kaggle" else lstr[-1] npzfile = d_path + ((d_file + "_day") if dataset == "kaggle" else d_file) # trafile = d_path + ((d_file + "_fea") if dataset == "kaggle" else "fea") # check if pre-processed data is available data_ready = True if memory_map: for i in range(days): reo_data = d_path + npzfile + "_{0}_reordered.npz".format(i) if not path.exists(str(reo_data)): data_ready = False else: if not path.exists(str(pro_data)): data_ready = False # pre-process data if needed # WARNNING: when memory mapping is used we get a collection of files if data_ready: print("Reading pre-processed data=%s" % (str(pro_data))) file = str(pro_data) else: print("Reading raw data=%s" % (str(raw_path))) file = getCriteoAdData( raw_path, o_filename, max_ind_range, sub_sample_rate, days, data_split, randomize, dataset == "kaggle", memory_map ) return file, days if __name__ == "__main__": ### import packages ### import argparse ### parse arguments ### parser = argparse.ArgumentParser( description="Preprocess Criteo dataset" ) # model related parameters parser.add_argument("--max-ind-range", type=int, default=-1) parser.add_argument("--data-sub-sample-rate", type=float, default=0.0) # in [0, 1] parser.add_argument("--data-randomize", type=str, default="total") # or day or none parser.add_argument("--memory-map", action="store_true", default=False) parser.add_argument("--data-set", type=str, default="kaggle") # or terabyte parser.add_argument("--raw-data-file", type=str, default="") parser.add_argument("--processed-data-file", type=str, default="") args = parser.parse_args() loadDataset( args.data_set, args.max_ind_range, args.data_sub_sample_rate, args.data_randomize, "train", args.raw_data_file, args.processed_data_file, args.memory_map )