in metaicl/data.py [0:0]
def tensorize_for_training(self, train_data, keyword, seed):
assert self.tensorize_dir is not None
if not os.path.exists(self.tensorize_dir):
os.makedirs(self.tensorize_dir)
method_name = self.method + "-demon" if self.use_demonstrations else self.method
k_name = "%d-%d" % (len(train_data), self.k) if self.use_demonstrations else len(train_data)
length_name = "%d-%d" % (self.max_length, self.max_length_per_example) if self.use_demonstrations else self.max_length
tensorize_path = os.path.join(self.tensorize_dir,
"{}_{}_k={}_seed={}_length={}-rank=%d.pkl".format(
keyword, method_name, k_name, seed, length_name))
if self.local_rank==-1:
self.logger.info(tensorize_path)
else:
self.logger.info(tensorize_path % self.local_rank)
all_tensorize_paths = [tensorize_path % i for i in range(self.n_gpu)]
if not self.do_tensorize:
if not np.all([os.path.exists(_path) for _path in all_tensorize_paths]):
self.logger.info("Tensorization was not done. Run with `--do_tensorize` without distributed mode"
"and then run training command again")
raise NotImplementedError()
if self.local_rank==-1:
inputs = defaultdict(list)
for i in range(self.n_gpu):
with open(tensorize_path % i, "rb") as f:
curr_inputs = pkl.load(f)
for k, v in curr_inputs.items():
inputs[k] += v
else:
assert 0<=self.local_rank<self.n_gpu
with open(tensorize_path % self.local_rank, "rb") as f:
inputs = pkl.load(f)
self.tensorized_inputs = inputs
return
assert self.local_rank==-1
if any([os.path.exists(_path) for _path in all_tensorize_paths]):
self.logger.info("tensorize file already exists...")
return
unique_task_names = set([dp["task"] for dp in train_data])
sharded_inputs = []
if self.use_demonstrations or (len(unique_task_names)>200 and len(train_data)>=1638400):
tot = 0
for i, curr_train_task in enumerate(unique_task_names):
curr_train_data = [dp for dp in train_data if dp["task"]==curr_train_task]
tot += len(curr_train_data)
if self.use_demonstrations and len(unique_task_names)>200 and len(train_data)>=1638400:
# data is too huge; sampling 10% of the data
self.logger.info("Sampling training data from %d to %d", len(curr_train_data), len(curr_train_data)//10)
indices = np.random.permutation(range(len(curr_train_data)))[:len(curr_train_data)//10]
curr_train_data = [curr_train_data[i] for i in indices]
elif len(unique_task_names)>200 and len(train_data)>=1638400:
# data is too huge; sampling 50% of the data
self.logger.info("Sampling training data from %d to %d", len(curr_train_data), len(curr_train_data)//2)
indices = np.random.permutation(range(len(curr_train_data)))[:len(curr_train_data)//2]
curr_train_data = [curr_train_data[i] for i in indices]
sharded_inputs.append(curr_train_data)
assert len(train_data)==tot
else:
n_per_shard = math.ceil(len(train_data) / self.n_process)
for i in range(self.n_process):
sharded_inputs.append(train_data[i*n_per_shard:(i+1)*n_per_shard])
inputs = {"input_ids": [], "attention_mask": [], "token_type_ids": []}
if self.n_process==1:
for in_ in sharded_inputs:
out = self._tensorize_for_training(in_)
for key in ["input_ids", "attention_mask", "token_type_ids"]:
inputs[key] += out[key].numpy().tolist()
else:
with Pool(self.n_process) as p:
for out in p.imap_unordered(self._tensorize_for_training, sharded_inputs):
for key in ["input_ids", "attention_mask", "token_type_ids"]:
inputs[key] += out[key].numpy().tolist()
N = len(inputs["input_ids"])
indices = np.random.permutation(range(N))
for k, v in inputs.items():
inputs[k] = np.array(v)[indices]
n_per_shard = math.ceil(N / self.n_gpu)
for i, _path in enumerate(all_tensorize_paths):
start = i*n_per_shard
end = (i+1)*n_per_shard
curr_inputs = {k:v[start:end].tolist() for k, v in inputs.items()}
with open(_path, "wb") as f:
pkl.dump(curr_inputs, f)
self.logger.info("Preprocessing done for i=%d" % i)
self.logger.info("Finish saving preprocessed data ...")