in Source/Readers/HTKMLFReader/HTKMLFReader.cpp [952:1473]
bool HTKMLFReader<ElemType>::GetMinibatchToTrainOrTest(StreamMinibatchInputs& matrices)
{
size_t id;
size_t dim;
bool skip = false;
// on first minibatch, make sure we can supply data for requested nodes
if (m_checkDictionaryKeys)
{
for (auto iter = matrices.begin(); iter != matrices.end(); iter++)
{
if (m_nameToTypeMap.find(iter->first) == m_nameToTypeMap.end())
{
RuntimeError("minibatch requested for input node %ls not found in reader - cannot generate input", iter->first.c_str());
}
}
m_checkDictionaryKeys = false;
}
Timer aggregateTimer;
if (m_verbosity > 2)
aggregateTimer.Start();
do
{
if (!m_truncated)
{
// -------------------------------------------------------
// frame mode or whole utterances
// -------------------------------------------------------
m_extraLatticeBufferMultiUtt.clear();
m_extraLabelsIDBufferMultiUtt.clear();
m_extraPhoneboundaryIDBufferMultiUtt.clear();
m_extraSeqsPerMB.clear();
if (m_noData && m_numFramesToProcess[0] == 0) // no data left for the first channel of this minibatch,
{
return false;
}
// BUGBUG: We should decide how many utterances we are going to take, until the desired number of frames has been filled.
// Currently it seems to fill a fixed number of utterances, regardless of their length.
// decide the m_mbNumTimeSteps
// The number of columns is determined by the longest utterance amongst the desired set.
// I.e. whatever is user-specified as the MB size, will be ignored here (that value is, however, passed down to the underlying reader). BUGBUG: That is even more wrong.
// BUGBUG: We should honor the mbSize parameter and fill up to the requested number of samples, using the requested #parallel sequences.
// m_mbNumTimeSteps = max (m_numFramesToProcess[.])
m_mbNumTimeSteps = m_numFramesToProcess[0];
for (size_t i = 1; i < m_numSeqsPerMB; i++)
{
if (m_mbNumTimeSteps < m_numFramesToProcess[i])
m_mbNumTimeSteps = m_numFramesToProcess[i];
}
if (m_frameMode)
{
assert(m_numSeqsPerMB == 1); // user must not request parallel sequences
m_pMBLayout->InitAsFrameMode(m_mbNumTimeSteps);
}
else
{
m_pMBLayout->Init(m_numSeqsPerMB, m_mbNumTimeSteps);
}
// create a MB with the desired utterances
// First fill each parallel sequence with one utterance. No packing yet.
// Note that the code below is a little misleading for frame mode.
// In frame mode, this reader thinks it has only one parallel sequence (m_numSeqsPerMB == 1),
// but it reports it to the outside as N parallel sequences of one frame each.
skip = (m_frameMode && !m_partialMinibatch && (m_mbiter->requestedframes() != m_mbNumTimeSteps) && (m_frameSource->totalframes() > m_mbNumTimeSteps));
for (size_t i = 0; i < m_numSeqsPerMB; i++)
{
if (!skip)
{
// a stopgap
if (m_numFramesToProcess[i] > 0 && m_latticeBufferMultiUtt[i] && m_latticeBufferMultiUtt[i]->getnumframes() != m_numFramesToProcess[i])
{
// BUGBUG: we just found that (due to some bugs yet to be tracked down),
// the filled number of frames is inconsistent with the number frames in lattices (though it rarely occurs)
// This is just a stopgap, to be removed after the bugs are found and fixed
bool needRenew = true;
while (needRenew)
{
size_t framenum = m_numFramesToProcess[i];
fprintf(stderr, "WARNING: mismatched number of frames filled in the reader: %d in data vs %d in lattices. Ignoring this utterance %ls\n",
(int) framenum, (int) m_latticeBufferMultiUtt[i]->getnumframes(), m_latticeBufferMultiUtt[i]->getkey().c_str());
ReNewBufferForMultiIO(i);
needRenew = m_numFramesToProcess[i] > 0 && m_latticeBufferMultiUtt[i] && m_latticeBufferMultiUtt[i]->getnumframes() != m_numFramesToProcess[i];
}
}
m_numValidFrames[i] = m_numFramesToProcess[i];
if (m_numValidFrames[i] > 0)
{
if (m_frameMode)
{
// the layout has already been initialized as entirely frame mode above
assert(i == 0); // this reader thinks there is only one parallel sequence
for (size_t s = 0; s < m_pMBLayout->GetNumParallelSequences(); s++)
{
assert(s < m_numValidFrames[i]); // MB is already set to only include the valid frames (no need for gaps)
}
}
else
{
m_pMBLayout->AddSequence(NEW_SEQUENCE_ID, i, 0, m_numValidFrames[i]);
}
m_extraSeqsPerMB.push_back(i);
fillOneUttDataforParallelmode(matrices, 0, m_numValidFrames[i], i, i);
if (m_latticeBufferMultiUtt[i] != nullptr)
{
m_extraLatticeBufferMultiUtt.push_back(m_latticeBufferMultiUtt[i]);
m_extraLabelsIDBufferMultiUtt.push_back(m_labelsIDBufferMultiUtt[i]);
m_extraPhoneboundaryIDBufferMultiUtt.push_back(m_phoneboundaryIDBufferMultiUtt[i]);
}
}
}
ReNewBufferForMultiIO(i);
}
if (!skip)
{
m_extraNumSeqs = 0;
if (!m_frameMode)
{
for (size_t src = 0; src < m_numSeqsPerMB;)
{
size_t framenum = m_numFramesToProcess[src];
if (framenum == 0)
{
src++;
continue;
}
if (m_latticeBufferMultiUtt[src] != nullptr && m_latticeBufferMultiUtt[src]->getnumframes() != framenum)
{
// BUGBUG: we just found that (due to some bugs yet to be tracked down),
// the filled number of frames is inconsistent with the number frames in lattices (though it rarely occurs)
// This is just a stopgap, to be removed after the bugs are found and fixed
fprintf(stderr, "WARNING: mismatched number of frames filled in the reader: %d in data vs %d in lattices. Ignoring this utterance %ls\n",
(int) framenum, (int) m_latticeBufferMultiUtt[src]->getnumframes(), m_latticeBufferMultiUtt[src]->getkey().c_str());
src++;
continue;
}
bool slotFound = false;
for (size_t des = 0; des < m_numSeqsPerMB; des++) // try to found a slot
{
if (framenum + m_numValidFrames[des] < m_mbNumTimeSteps)
{
// found !
m_extraSeqsPerMB.push_back(des);
if (m_latticeBufferMultiUtt[src] != nullptr)
{
m_extraLatticeBufferMultiUtt.push_back(m_latticeBufferMultiUtt[src]);
m_extraLabelsIDBufferMultiUtt.push_back(m_labelsIDBufferMultiUtt[src]);
m_extraPhoneboundaryIDBufferMultiUtt.push_back(m_phoneboundaryIDBufferMultiUtt[src]);
}
fillOneUttDataforParallelmode(matrices, m_numValidFrames[des], framenum, des, src);
m_pMBLayout->AddSequence(NEW_SEQUENCE_ID, des, m_numValidFrames[des], m_numValidFrames[des] + framenum);
ReNewBufferForMultiIO(src);
m_numValidFrames[des] += framenum;
m_extraNumSeqs++;
slotFound = true;
break;
}
}
if (!slotFound)
{
src++; // done with this source; try next source;
}
}
// and declare the remaining gaps as such
for (size_t i = 0; i < m_numSeqsPerMB; i++)
m_pMBLayout->AddGap(i, m_numValidFrames[i], m_mbNumTimeSteps);
} // if (!frameMode)
for (auto iter2 = matrices.begin(); iter2 != matrices.end(); iter2++)
{
// dereference matrix that corresponds to key (input/output name) and
// populate based on whether its a feature or a label
Matrix<ElemType>& data = matrices.GetInputMatrix<ElemType>(iter2->first); // can be features or labels
if (m_nameToTypeMap[iter2->first] == InputOutputTypes::real)
{
id = m_featureNameToIdMap[iter2->first];
dim = m_featureNameToDimMap[iter2->first];
data.SetValue(dim, m_mbNumTimeSteps * m_numSeqsPerMB, data.GetDeviceId(), m_featuresBufferMultiIO[id].get(), matrixFlagNormal);
}
else if (m_nameToTypeMap[iter2->first] == InputOutputTypes::category)
{
id = m_labelNameToIdMap[iter2->first];
dim = m_labelNameToDimMap[iter2->first];
data.SetValue(dim, m_mbNumTimeSteps * m_numSeqsPerMB, data.GetDeviceId(), m_labelsBufferMultiIO[id].get(), matrixFlagNormal);
}
}
}
}
else // if m_truncated
{
// -------------------------------------------------------
// truncated BPTT
// -------------------------------------------------------
// In truncated BPTT mode, a minibatch only consists of the truncation length, e.g. 20 frames.
// The reader maintains a set of current utterances, and each next minibatch contains the next 20 frames.
// When the end of an utterance is reached, the next available utterance is begin in the same slot.
if (m_noData) // we are returning the last utterances for this epoch
{
// return false if all cursors for all parallel sequences have reached the end
bool endEpoch = true;
for (size_t i = 0; i < m_numSeqsPerMB; i++)
{
if (m_processedFrame[i] != m_numFramesToProcess[i])
endEpoch = false;
}
if (endEpoch)
return false;
}
size_t numOfFea = m_featuresBufferMultiIO.size();
size_t numOfLabel = m_labelsBufferMultiIO.size();
// create the feature matrix
m_pMBLayout->Init(m_numSeqsPerMB, m_mbNumTimeSteps);
vector<size_t> actualmbsize(m_numSeqsPerMB, 0);
for (size_t i = 0; i < m_numSeqsPerMB; i++)
{
// fill one parallel-sequence slot
const size_t startFr = m_processedFrame[i]; // start frame (cursor) inside the utterance that corresponds to time step [0]
// add utterance to MBLayout
assert(m_numFramesToProcess[i] > startFr || (m_noData && m_numFramesToProcess[i] == startFr));
if (m_numFramesToProcess[i] > startFr)
{ // in an edge case (m_noData), startFr is at end
m_pMBLayout->AddSequence(NEW_SEQUENCE_ID, i, -(ptrdiff_t)startFr, m_numFramesToProcess[i] - startFr);
}
if (startFr + m_mbNumTimeSteps < m_numFramesToProcess[i]) // end of this minibatch does not reach until end of utterance
{
// we return the next 'm_mbNumTimeSteps' frames, filling all time steps
if (startFr > 0) // not the beginning of the utterance
{
m_sentenceEnd[i] = false;
m_switchFrame[i] = m_mbNumTimeSteps + 1;
}
else // beginning of the utterance
{
m_sentenceEnd[i] = true;
m_switchFrame[i] = 0;
}
actualmbsize[i] = m_mbNumTimeSteps;
const size_t endFr = startFr + actualmbsize[i]; // actual end frame index of this segment
for (auto iter3 = matrices.begin(); iter3 != matrices.end(); iter3++)
{
// dereference matrix that corresponds to key (input/output name) and
// populate based on whether its a feature or a label
Matrix<ElemType>& data = matrices.GetInputMatrix<ElemType>(iter3->first); // can be features or labels
if (m_nameToTypeMap[iter3->first] == InputOutputTypes::real)
{
id = m_featureNameToIdMap[iter3->first];
dim = m_featureNameToDimMap[iter3->first];
if ((m_featuresBufferMultiIO[id] == nullptr) ||
(m_featuresBufferAllocatedMultiIO[id] < (dim * m_mbNumTimeSteps * m_numSeqsPerMB)) /*buffer size changed. can be partial minibatch*/)
{
m_featuresBufferMultiIO[id] = AllocateIntermediateBuffer(data.GetDeviceId(), dim * m_mbNumTimeSteps * m_numSeqsPerMB);
m_featuresBufferAllocatedMultiIO[id] = dim * m_mbNumTimeSteps * m_numSeqsPerMB;
}
if (sizeof(ElemType) == sizeof(float))
{
for (size_t j = startFr, k = 0; j < endFr; j++, k++) // column major, so iterate columns
{
// copy over the entire column at once, need to do this because SSEMatrix may have gaps at the end of the columns
memcpy_s(&m_featuresBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim],
sizeof(ElemType) * dim,
&m_featuresBufferMultiUtt[i].get()[j * dim + m_featuresStartIndexMultiUtt[id + i * numOfFea]],
sizeof(ElemType) * dim);
}
}
else // double: must type-cast, cannot memcpy()
{
for (size_t j = startFr, k = 0; j < endFr; j++, k++) // column major, so iterate columns in outside loop
{
for (int d = 0; d < dim; d++)
{
m_featuresBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim + d] =
m_featuresBufferMultiUtt[i].get()[j * dim + d + m_featuresStartIndexMultiUtt[id + i * numOfFea]];
}
}
}
}
else if (m_nameToTypeMap[iter3->first] == InputOutputTypes::category)
{
id = m_labelNameToIdMap[iter3->first];
dim = m_labelNameToDimMap[iter3->first];
if ((m_labelsBufferMultiIO[id] == nullptr) ||
(m_labelsBufferAllocatedMultiIO[id] < (dim * m_mbNumTimeSteps * m_numSeqsPerMB)))
{
m_labelsBufferMultiIO[id] = AllocateIntermediateBuffer(data.GetDeviceId(), dim * m_mbNumTimeSteps * m_numSeqsPerMB);
m_labelsBufferAllocatedMultiIO[id] = dim * m_mbNumTimeSteps * m_numSeqsPerMB;
}
for (size_t j = startFr, k = 0; j < endFr; j++, k++)
{
for (int d = 0; d < dim; d++)
{
m_labelsBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim + d] =
m_labelsBufferMultiUtt[i].get()[j * dim + d + m_labelsStartIndexMultiUtt[id + i * numOfLabel]];
}
}
}
}
m_processedFrame[i] += m_mbNumTimeSteps;
}
else // if (startFr + m_mbNumTimeSteps < m_numFramesToProcess[i]) (in this else branch, utterance ends inside this minibatch)
{
// utterance ends: first copy this segment (later, we will pack more utterances in)
assert(startFr == m_processedFrame[i]);
actualmbsize[i] = m_numFramesToProcess[i] - startFr; // parallel sequence is used up to this point
const size_t endFr = startFr + actualmbsize[i]; // end frame in sequence
assert(endFr == m_numFramesToProcess[i]); // we are at the end
// fill frames for the tail of this utterance
for (auto iter4 = matrices.begin(); iter4 != matrices.end(); iter4++)
{
// dereference matrix that corresponds to key (input/output name) and
// populate based on whether its a feature or a label
Matrix<ElemType>& data = matrices.GetInputMatrix<ElemType>(iter4->first); // can be features or labels
if (m_nameToTypeMap[iter4->first] == InputOutputTypes::real)
{
id = m_featureNameToIdMap[iter4->first];
dim = m_featureNameToDimMap[iter4->first];
if ((m_featuresBufferMultiIO[id] == nullptr) ||
(m_featuresBufferAllocatedMultiIO[id] < (dim * m_mbNumTimeSteps * m_numSeqsPerMB)) /*buffer size changed. can be partial minibatch*/)
{
m_featuresBufferMultiIO[id] = AllocateIntermediateBuffer(data.GetDeviceId(), dim * m_mbNumTimeSteps * m_numSeqsPerMB);
m_featuresBufferAllocatedMultiIO[id] = dim * m_mbNumTimeSteps * m_numSeqsPerMB;
}
if (sizeof(ElemType) == sizeof(float))
{
for (size_t j = startFr, k = 0; j < endFr; j++, k++) // column major, so iterate columns
{
// copy over the entire column at once, need to do this because SSEMatrix may have gaps at the end of the columns
memcpy_s(&m_featuresBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim],
sizeof(ElemType) * dim,
&m_featuresBufferMultiUtt[i].get()[j * dim + m_featuresStartIndexMultiUtt[id + i * numOfFea]],
sizeof(ElemType) * dim);
}
}
else
{
for (size_t j = startFr, k = 0; j < endFr; j++, k++) // column major, so iterate columns in outside loop
{
for (int d = 0; d < dim; d++)
{
m_featuresBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim + d] =
m_featuresBufferMultiUtt[i].get()[j * dim + d + m_featuresStartIndexMultiUtt[id + i * numOfFea]];
}
}
}
}
else if (m_nameToTypeMap[iter4->first] == InputOutputTypes::category)
{
id = m_labelNameToIdMap[iter4->first];
dim = m_labelNameToDimMap[iter4->first];
if ((m_labelsBufferMultiIO[id] == nullptr) ||
(m_labelsBufferAllocatedMultiIO[id] < (dim * m_mbNumTimeSteps * m_numSeqsPerMB)))
{
m_labelsBufferMultiIO[id] = AllocateIntermediateBuffer(data.GetDeviceId(), dim * m_mbNumTimeSteps * m_numSeqsPerMB);
m_labelsBufferAllocatedMultiIO[id] = dim * m_mbNumTimeSteps * m_numSeqsPerMB;
}
for (size_t j = startFr, k = 0; j < endFr; j++, k++)
{
for (int d = 0; d < dim; d++)
{
m_labelsBufferMultiIO[id].get()[(k * m_numSeqsPerMB + i) * dim + d] =
m_labelsBufferMultiUtt[i].get()[j * dim + d + m_labelsStartIndexMultiUtt[id + i * numOfLabel]];
}
}
}
}
m_processedFrame[i] += (endFr - startFr); // advance the cursor
assert(m_processedFrame[i] == m_numFramesToProcess[i]); // we must be at the end
m_switchFrame[i] = actualmbsize[i];
// if (actualmbsize[i] != 0)
// m_pMBLayout->Set(i, actualmbsize[i] - 1, MinibatchPackingFlags::SequenceEnd); // NOTE: this ORs, while original code overwrote in matrix but ORed into vector
// at this point, we completed an utterance--fill the rest with the next utterance
// BUGBUG: We should fill in a loop until we fill the minibatch for the case where just one ReNew is not sufficient
// to fill up the remaining slots in the minibatch
bool reNewSucc = ReNewBufferForMultiIO(i);
if (actualmbsize[i] < m_mbNumTimeSteps) // we actually have space
{
if (reNewSucc) // we actually have another utterance to start here
{
const size_t startT = m_switchFrame[i];
// Have to take the min, if the next sequence is shorted then truncation length.
const size_t endT = min(m_mbNumTimeSteps, startT + m_numFramesToProcess[i]);
// Note: Don't confuse startT/endT with startFr/endFr above.
// add sequence to MBLayout
m_pMBLayout->AddSequence(NEW_SEQUENCE_ID, i, startT, startT + m_numFramesToProcess[i]);
// copy the data
for (auto iter5 = matrices.begin(); iter5 != matrices.end(); iter5++)
{
// dereference matrix that corresponds to key (input/output name) and
// populate based on whether its a feature or a label
// Matrix<ElemType>& data = *matrices[iter5->first]; // can be features or labels
if (m_nameToTypeMap[iter5->first] == InputOutputTypes::real)
{
id = m_featureNameToIdMap[iter5->first];
dim = m_featureNameToDimMap[iter5->first];
if (sizeof(ElemType) == sizeof(float))
{
for (size_t t = startT, fr = 0; t < endT; t++, fr++) // column major, so iterate columns
{
// copy over the entire column at once, need to do this because SSEMatrix may have gaps at the end of the columns (for SSE alignment)
memcpy_s(&m_featuresBufferMultiIO[id].get()[(t * m_numSeqsPerMB + i) * dim],
sizeof(ElemType) * dim,
&m_featuresBufferMultiUtt[i].get()[fr * dim + m_featuresStartIndexMultiUtt[id + i * numOfFea]],
sizeof(ElemType) * dim);
}
}
else
{
for (size_t t = startT, fr = 0; t < endT; t++, fr++) // column major, so iterate columns in outside loop
{
for (int d = 0; d < dim; d++)
{
m_featuresBufferMultiIO[id].get()[(t * m_numSeqsPerMB + i) * dim + d] =
m_featuresBufferMultiUtt[i].get()[fr * dim + d + m_featuresStartIndexMultiUtt[id + i * numOfFea]];
}
}
}
}
else if (m_nameToTypeMap[iter5->first] == InputOutputTypes::category)
{
id = m_labelNameToIdMap[iter5->first];
dim = m_labelNameToDimMap[iter5->first];
for (size_t t = startT, fr = 0; t < endT; t++, fr++)
{
for (int d = 0; d < dim; d++)
{
m_labelsBufferMultiIO[id].get()[(t * m_numSeqsPerMB + i) * dim + d] =
m_labelsBufferMultiUtt[i].get()[fr * dim + d + m_labelsStartIndexMultiUtt[id + i * numOfLabel]];
}
}
}
}
m_processedFrame[i] += (endT - startT);
// BUGBUG: since we currently cannot fill >1 utterances, at least let's check
size_t a = actualmbsize[i] + (endT - startT);
// actualmbsize[i] += (endT - startT); // BUGBUG: don't we need something like this?
if (a < m_mbNumTimeSteps)
{
fprintf(stderr, "GetMinibatchToTrainOrTest(): WARNING: Packing a second utterance did still not fill all time slots; filling slots from %d on as gaps.\n", (int) a);
// declare the rest as a gap
m_pMBLayout->AddGap(i, a, m_mbNumTimeSteps);
// Have to renew, so that there is data for the next read.
ReNewBufferForMultiIO(i);
}
}
else // we did have space for more, but no more data is available. BUGBUG: we should update actualmbsize[i] above and re-test here
{
// declare the rest as a gap
m_pMBLayout->AddGap(i, actualmbsize[i], m_mbNumTimeSteps);
}
} // if (actualmbsize[i] < m_mbNumTimeSteps) // we actually have space
}
} // for (size_t i = 0; i < m_numSeqsPerMB; i++)
// we are done filling all parallel sequences
for (auto iter6 = matrices.begin(); iter6 != matrices.end(); iter6++)
{
// dereference matrix that corresponds to key (input/output name) and
// populate based on whether its a feature or a label
Matrix<ElemType>& data = matrices.GetInputMatrix<ElemType>(iter6->first); // can be features or labels
if (m_nameToTypeMap[iter6->first] == InputOutputTypes::real)
{
id = m_featureNameToIdMap[iter6->first];
dim = m_featureNameToDimMap[iter6->first];
data.SetValue(dim, m_mbNumTimeSteps * m_numSeqsPerMB, data.GetDeviceId(), m_featuresBufferMultiIO[id].get(), matrixFlagNormal);
}
else if (m_nameToTypeMap[iter6->first] == InputOutputTypes::category)
{
id = m_labelNameToIdMap[iter6->first];
dim = m_labelNameToDimMap[iter6->first];
data.SetValue(dim, m_mbNumTimeSteps * m_numSeqsPerMB, data.GetDeviceId(), m_labelsBufferMultiIO[id].get(), matrixFlagNormal);
}
}
skip = false;
} // if truncated then else
} while (skip); // keep going if we didn't get the right size minibatch
if (m_verbosity > 2)
{
aggregateTimer.Stop();
double totalMBReadTime = aggregateTimer.ElapsedSeconds();
fprintf(stderr, "Total Minibatch read time = %.8g\n", totalMBReadTime);
}
return true;
}