learning/batchifier.py (492 lines of code) (raw):
import numpy as np
import string
from dataset import TSVDataset, H5Dataset, CombinedDataset
from generator import prefetch_generator
def word_dropout(inputs, rng, keep_prob):
inputs_ndim = inputs.ndim
mask_shape = [len(inputs)] + [1] * (inputs_ndim - 1)
return (
inputs *
(
rng.random_sample(size=mask_shape) <
keep_prob
)
).astype(inputs.dtype)
def extract_feat(feat):
if feat["type"] == "word":
return lambda x: x
elif feat["type"] == "suffix":
length = feat["length"]
return lambda x: x[-length:]
elif feat["type"] == "prefix":
length = feat["length"]
return lambda x: x[:length]
elif feat["type"] == "digit":
return lambda x: x.isdigit()
elif feat["type"] == "punctuation_count":
return lambda x: sum(c in string.punctuation for c in x)
elif feat["type"] == "uppercase":
return lambda x: len(x) > 0 and x[0].isupper()
elif feat["type"] == "character-conv":
max_size = feat["max_word_length"]
def extract(x):
x_bytes = x.encode("utf-8")
if len(x_bytes) > max_size:
return np.concatenate(
[
[255],
list(x_bytes[:max_size]),
[256]
]
)
else:
return np.concatenate(
[
[255],
list(x_bytes),
[256],
-np.ones(max_size - len(x_bytes), dtype=np.int32),
]
)
return extract
else:
raise ValueError("unknown feature %r." % (feat,))
def extract_word_keep_prob(feat):
return feat.get("word_keep_prob", 0.85)
def extract_case_keep_prob(feat):
return feat.get("case_keep_prob", 0.95)
def extract_s_keep_prob(feat):
return feat.get("s_keep_prob", 0.95)
def apply_case_s_keep_prob(feat, rng, keep_case, keep_s):
if len(feat) == 0:
return feat
if keep_case < 1 and feat[0].isupper() and rng.random_sample() >= keep_case:
feat = feat.lower()
if keep_s < 1 and feat.endswith("s") and rng.random_sample() >= keep_s:
feat = feat[:-1]
return feat
def requires_character_convolution(feat):
return feat["type"] in {"character-conv"}
def requires_vocab(feat):
return feat["type"] in {"word", "suffix", "prefix"}
def feature_npdtype(feat):
if requires_vocab(feat):
return np.int32
elif feat["type"] in {"digit", "punctuation_count", "uppercase"}:
return np.float32
elif requires_character_convolution(feat):
return np.int32
else:
raise ValueError("unknown feature %r." % (feat,))
def get_vocabs(dataset, max_vocabs, extra_words=None):
index2words = [[] for i in range(len(max_vocabs))]
occurrences = [{} for i in range(len(max_vocabs))]
for els in dataset:
for el, index2word, occurrence in zip(els, index2words, occurrences):
if el not in occurrence:
index2word.append(el)
occurrence[el] = 1
else:
occurrence[el] += 1
index2words = [
sorted(index2word, key=lambda x: occurrence[x], reverse=True)
for index2word, occurrence in zip(index2words, occurrences)
]
index2words = [
index2word[:max_vocab] if max_vocab > 0 else index2word
for index2word, max_vocab in zip(index2words, max_vocabs)
]
if extra_words is not None:
index2words = [
extra_words + index2word for index2word in index2words
]
return index2words
def get_feature_vocabs(features, dataset, extra_words=None):
out, feats_needing_vocab, feats_with_vocabs, vocabs = [], [], [], []
if hasattr(dataset, "set_ignore_y"):
dataset.set_ignore_y(True)
try:
for feat in features:
if requires_vocab(feat):
if feat.get("path") is not None:
with open(feat["path"], "rt") as fin:
index2word = fin.read().splitlines()
if feat.get("max_vocab", -1) > 0:
index2word = index2word[:feat["max_vocab"]]
if extra_words is not None:
index2word = extra_words + index2word
feats_with_vocabs.append(index2word)
else:
feats_needing_vocab.append(feat)
if len(feats_needing_vocab) > 0:
extractors = tuple(
[extract_feat(feat) for feat in feats_needing_vocab]
)
vocabs = get_vocabs(
((extractor(w) for extractor in extractors)
for x, _ in dataset for w in x),
max_vocabs=[feat.get("max_vocab", -1) for feat in feats_needing_vocab],
extra_words=extra_words
)
vocab_feature_idx = 0
preexisting_vocab_feature_idx = 0
for feat in features:
if requires_vocab(feat):
if feat.get("path") is not None:
out.append(feats_with_vocabs[preexisting_vocab_feature_idx])
preexisting_vocab_feature_idx += 1
else:
out.append(vocabs[vocab_feature_idx])
vocab_feature_idx+=1
else:
out.append(None)
finally:
if hasattr(dataset, "set_ignore_y"):
dataset.set_ignore_y(False)
return out
def pad_arrays_into_array(arrays, padding):
out_ndim = arrays[0].ndim + 1
out_shape = [0] * out_ndim
out_shape[0] = len(arrays)
for arr in arrays:
for dim_idx in range(arr.ndim):
out_shape[1 + dim_idx] = max(out_shape[1 + dim_idx], arr.shape[dim_idx])
out = np.empty(out_shape, dtype=arrays[0].dtype)
out.fill(padding)
for arr_idx, array in enumerate(arrays):
arr_slice = [arr_idx]
for dim_idx in range(arr.ndim):
arr_slice.append(slice(0, array.shape[dim_idx]))
arr_slice = tuple(arr_slice)
out[arr_slice] = array
return out
def build_objective_mask(label_sequence, objective_idx, objective_type):
if objective_type == 'crf':
if len(label_sequence) == 0 or label_sequence[0][objective_idx] is None:
return np.array(False, dtype=np.bool)
else:
return np.array(True, dtype=np.bool)
elif objective_type == 'softmax':
return np.array(
[w[objective_idx] is not None for w in label_sequence], dtype=np.bool
)
else:
raise ValueError(
"unknown objective type %r." % (objective_type,)
)
def allocate_shrunk_batches(max_length, batch_size, lengths):
typical_indices = max_length * batch_size
i = 0
ranges = []
while i < len(lengths):
j = i + 1
current_batch_size = 1
longest_ex = lengths[j - 1]
while j < len(lengths) and j - i < batch_size:
# can grow?
new_batch_size = current_batch_size + 1
new_j = j + 1
if max(longest_ex, lengths[new_j - 1]) * new_batch_size < typical_indices:
j = new_j
longest_ex = max(longest_ex, lengths[new_j - 1])
current_batch_size = new_batch_size
else:
break
ranges.append((i, j))
i = j
return ranges
def convert_label_to_index(label, label2index):
if label is None:
return 0
if isinstance(label, str):
return label2index[label]
return label
class Batchifier(object):
def __init__(self, rng, feature_word2index, objective_types, label2index,
fused, sequence_lengths, labels, labels_mask,
input_placeholders, features, dataset, batch_size, train,
autoresize=True, max_length=100):
assert(batch_size > 0), (
"batch size must be strictly positive (got %r)." % (batch_size,)
)
# dictionaries, strings defined by model:
self.objective_types = objective_types
self.label2index = label2index
self.feature_word2index = feature_word2index
self.rng = rng
self.fused = fused
# tf placeholders:
self.sequence_lengths = sequence_lengths
self.labels = labels
self.labels_mask = labels_mask
self.input_placeholders = input_placeholders
self.dataset = dataset
self.batch_size = batch_size
self.train = train
self.dataset_is_lazy = isinstance(dataset, (TSVDataset, H5Dataset, CombinedDataset))
self.autoresize = autoresize
self.max_length = max_length
indices = np.arange(len(dataset))
if train:
if self.dataset_is_lazy:
dataset.set_rng(rng)
dataset.set_randomize(True)
elif isinstance(dataset, list):
rng.shuffle(indices)
self.batch_indices = []
if self.autoresize and not self.dataset_is_lazy:
ranges = allocate_shrunk_batches(
max_length=self.max_length,
batch_size=self.batch_size,
lengths=[len(dataset[indices[i]][0]) for i in range(len(indices))]
)
for i, j in ranges:
self.batch_indices.append(indices[i:j])
else:
for i in range(0, len(indices), self.batch_size):
self.batch_indices.append(indices[i:i + self.batch_size])
self.extractors = [
(extract_feat(feat), requires_vocab(feat), feature_npdtype(feat),
extract_word_keep_prob(feat), extract_case_keep_prob(feat), extract_s_keep_prob(feat))
for feat in features
]
def generate_batch(self, examples):
X = [[] for i in range(len(self.extractors))]
Y = []
Y_mask = []
for ex, label in examples:
for idx, (extractor, uses_vocab, dtype, word_keep_prob, case_keep_prob, s_keep_prob) in enumerate(self.extractors):
if self.train and (case_keep_prob < 1 or s_keep_prob < 1):
ex = [apply_case_s_keep_prob(w, self.rng, case_keep_prob, s_keep_prob) for w in ex]
if uses_vocab:
word_feats = np.array(
[self.feature_word2index[idx].get(extractor(w), 0) for w in ex],
dtype=dtype
)
else:
word_feats = np.array([extractor(w) for w in ex], dtype=dtype)
if self.train and word_keep_prob < 1:
word_feats = word_dropout(
word_feats, self.rng, word_keep_prob
)
X[idx].append(word_feats)
Y.append(
tuple(
np.array([convert_label_to_index(w[objective_idx], label2index)
for w in label], dtype=np.int32)
for objective_idx, label2index in enumerate(self.label2index)
)
)
Y_mask.append(
tuple(
build_objective_mask(label, objective_idx, objective_type)
for objective_idx, objective_type in enumerate(self.objective_types)
)
)
sequence_lengths = np.array([len(x) for x in X[0]], dtype=np.int32)
X = [pad_arrays_into_array(x, -1) for x in X]
Y = [
pad_arrays_into_array([row[objective_idx] for row in Y], 0)
for objective_idx in range(len(self.objective_types))
]
Y_mask = [
pad_arrays_into_array([row[objective_idx] for row in Y_mask], 0.0)
for objective_idx in range(len(self.objective_types))
]
feed_dict = {
self.sequence_lengths: sequence_lengths
}
if self.fused:
feed_dict[self.labels[0]] = np.stack([y.T for y in Y], axis=-1)
feed_dict[self.labels_mask[0]] = np.stack([y.T for y in Y_mask], axis=-1)
else:
for y, placeholder in zip(Y, self.labels):
feed_dict[placeholder] = y.T
for y, placeholder in zip(Y_mask, self.labels_mask):
feed_dict[placeholder] = y.T
for idx, x in enumerate(X):
feed_dict[self.input_placeholders[idx]] = x.swapaxes(0, 1)
return feed_dict
def as_list(self):
return list(self.iter_batches())
def iter_batches(self, pbar=None):
gen = range(len(self.batch_indices))
if pbar is not None:
pbar.max_value = len(self.batch_indices)
pbar.value = 0
gen = pbar(gen)
if self.autoresize and self.dataset_is_lazy:
for idx in gen:
examples = [self.dataset[ex] for ex in self.batch_indices[idx]]
ranges = allocate_shrunk_batches(
max_length=self.max_length,
batch_size=self.batch_size,
lengths=[len(ex[0]) for ex in examples]
)
for i, j in ranges:
yield self.generate_batch(examples[i:j])
else:
for idx in gen:
yield self.generate_batch(
[self.dataset[ex] for ex in self.batch_indices[idx]]
)
def allocate_shrunk_batches(max_length, batch_size, lengths):
typical_indices = max_length * batch_size
i = 0
ranges = []
while i < len(lengths):
j = i + 1
current_batch_size = 1
longest_ex = lengths[j - 1]
while j < len(lengths) and j - i < batch_size:
# can grow?
new_batch_size = current_batch_size + 1
new_j = j + 1
if max(longest_ex, lengths[new_j - 1]) * new_batch_size < typical_indices:
j = new_j
longest_ex = max(longest_ex, lengths[new_j - 1])
current_batch_size = new_batch_size
else:
break
ranges.append((i, j))
i = j
return ranges
def batch_worker(rng,
features,
feature_word2index,
objective_types,
label2index,
fused,
sequence_lengths,
labels,
labels_mask,
input_placeholders,
autoresize,
train,
batch_size,
max_length,
dataset,
pbar,
batch_queue,
death_event):
batchifier = Batchifier(
rng=rng,
features=features,
feature_word2index=feature_word2index,
objective_types=objective_types,
label2index=label2index,
fused=fused,
sequence_lengths=sequence_lengths,
labels=labels,
labels_mask=labels_mask,
input_placeholders=input_placeholders,
autoresize=autoresize,
train=train,
batch_size=batch_size,
max_length=max_length,
dataset=dataset
)
for batch in batchifier.iter_batches(pbar=pbar):
if death_event.is_set():
break
batch_queue.put(batch)
if not death_event.is_set():
batch_queue.put(None)
def range_size(start, size):
return [i for i in range(start, start + size)]
class ProcessHolder(object):
def __init__(self, process, death_event, batch_queue):
self.process = process
self.batch_queue = batch_queue
self.death_event = death_event
def close(self):
self.death_event.set()
try:
self.batch_queue.close()
while True:
self.batch_queue.get_nowait()
except Exception as e:
pass
self.process.terminate()
self.process.join()
def __del__(self):
self.close()
def iter_batches_single_threaded(model,
dataset,
batch_size,
train,
autoresize=True,
max_length=100,
pbar=None):
tensorflow_placeholders = [model.sequence_lengths] + model.labels + model.labels_mask + model.input_placeholders
labels_start = 1
labels_mask_start = labels_start + len(model.labels)
placeholder_start = labels_mask_start + len(model.labels_mask)
batchifier = Batchifier(
rng=model.rng,
features=model.features,
feature_word2index=model.feature_word2index,
objective_types=[obj["type"] for obj in model.objectives],
label2index=model.label2index,
fused=model.fused,
sequence_lengths=0,
labels=range_size(labels_start, len(model.labels)),
labels_mask=range_size(labels_mask_start, len(model.labels_mask)),
input_placeholders=range_size(placeholder_start, len(model.input_placeholders)),
autoresize=autoresize,
train=train,
batch_size=batch_size,
max_length=max_length,
dataset=dataset
)
for batch in prefetch_generator(batchifier.iter_batches(pbar=pbar), to_fetch=100):
feed_dict = {}
for idx, key in enumerate(tensorflow_placeholders):
feed_dict[key] = batch[idx]
yield feed_dict
def iter_batches(model,
dataset,
batch_size,
train,
autoresize=True,
max_length=100,
pbar=None):
import multiprocessing
batch_queue = multiprocessing.Queue(maxsize=10)
tensorflow_placeholders = [model.sequence_lengths] + model.labels + model.labels_mask + model.input_placeholders
labels_start = 1
labels_mask_start = labels_start + len(model.labels)
placeholder_start = labels_mask_start + len(model.labels_mask)
death_event = multiprocessing.Event()
batch_process = ProcessHolder(multiprocessing.Process(
target=batch_worker,
daemon=True,
args=(
model.rng,
model.features,
model.feature_word2index,
[obj["type"] for obj in model.objectives],
model.label2index,
model.fused,
0,
range_size(labels_start, len(model.labels)),
range_size(labels_mask_start, len(model.labels_mask)),
range_size(placeholder_start, len(model.input_placeholders)),
autoresize,
train,
batch_size,
max_length,
dataset,
pbar,
batch_queue,
death_event
)
), death_event, batch_queue)
batch_process.process.name = "iter_batches"
batch_process.process.start()
while True:
batch = batch_queue.get()
if batch is None:
break
else:
feed_dict = {}
for idx, key in enumerate(tensorflow_placeholders):
feed_dict[key] = batch[idx]
yield feed_dict
del batch