in Source/SGDLib/SGD.cpp [201:998]
void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
bool networkLoadedFromCheckpoint,
ComputationNetworkPtr refNet,
ComputationNodeBasePtr refNode,
IDataReader* trainSetDataReader,
IDataReader* validationSetDataReader)
{
let& criterionNodes = GetTrainCriterionNodes(net);
fprintf(stderr, "\n");
if (criterionNodes.size() == 1)
{
LOGPRINTF(stderr, "Training criterion: %ls = %ls\n", criterionNodes.front()->NodeName().c_str(), criterionNodes.front()->OperationName().c_str());
}
else
{
LOGPRINTF(stderr, "Training criteria:\n");
for (const auto& node : criterionNodes)
{
LOGPRINTF(stderr, "\t%ls = %ls\n", node->NodeName().c_str(), node->OperationName().c_str());
}
if (criterionNodes.empty())
{
LOGPRINTF(stderr, "\t(none)\n");
InvalidArgument("TrainOrAdaptModel: No criterion node was specified.");
}
}
if (criterionNodes.front()->template Is<ComputationNode<half>>())
{
InvalidArgument("TrainOrAdaptModel: using Float16 for loss function may cause overflow, please cast to float.");
}
// This code is only relevant for the new (V2) readers. It exists because of
// a shortcoming in DecimateMinibatchInPlace, which does not yet work when inputs
// in the same minibatch have different layouts, which is something only V2 readers can
// produce.
if (m_enableDistributedMBReadingNotSpecified && m_mpi != nullptr && !trainSetDataReader->IsLegacyReader())
{
// we're running a parallel training with a v2 reader,
// auto-enable distributed reading
if (m_traceLevel > 0)
LOGPRINTF(stderr, "\"distributedMBReading\" is not explicitly specified, defaulting to 'true'.\n");
m_enableDistributedMBReading = true;
}
// determine evaluationNodes from GetEvalCriterionNodes(), ensuring each criterion is only logged once
std::vector<ComputationNodeBasePtr> evaluationNodes;
{
auto originalEvaluationNodes = GetEvalCriterionNodes(net);
set<ComputationNodeBasePtr> criteriaLogged; // set to make sure we don't double-log criteria
for (const auto& node : criterionNodes)
criteriaLogged.insert(node);
for (const auto& node : originalEvaluationNodes)
if (criteriaLogged.insert(node).second)
evaluationNodes.push_back(node);
if (evaluationNodes.size() == 1)
{
LOGPRINTF(stderr, "Evaluation criterion: %ls = %ls\n", evaluationNodes.front()->NodeName().c_str(), evaluationNodes.front()->OperationName().c_str());
}
else if (!evaluationNodes.empty())
{
fprintf(stderr, "\n");
LOGPRINTF(stderr, "Evaluation criteria:\n");
for (const auto& node : evaluationNodes)
{
LOGPRINTF(stderr, "\t%ls = %ls\n", node->NodeName().c_str(), node->OperationName().c_str());
}
}
}
std::vector<ComputationNodeBasePtr> additionalNodesToEvaluate;
// Do not include the output nodes in the matrix sharing structure when using forward value matrix
// sharing, since the output nodes are only used for AttemptUtteranceDerivativeFeatures functionality
// which does not work properly with forward value matrix sharing.
if (!Globals::ShouldEnableShareNodeValueMatrices())
{
auto& outputNodes = net->OutputNodes();
additionalNodesToEvaluate.insert(additionalNodesToEvaluate.end(), outputNodes.cbegin(), outputNodes.cend());
}
auto preComputeNodesList = net->GetNodesRequiringPreComputation();
additionalNodesToEvaluate.insert(additionalNodesToEvaluate.end(), preComputeNodesList.cbegin(), preComputeNodesList.cend());
// allocate memory for forward and backward computation
net->AllocateAllMatrices(evaluationNodes, additionalNodesToEvaluate, criterionNodes[0]); // TODO: use criterionNodes.front() throughout
// get feature and label nodes into an array of matrices that will be passed to GetMinibatch()
// TODO: instead, remember the nodes directly, to be able to handle both float and double nodes; current version will crash for mixed networks
StreamMinibatchInputs* inputMatrices = new StreamMinibatchInputs();
// TODO: ^^ change to shared_ptr or unique_ptr
let& featureNodes = net->FeatureNodes();
let& labelNodes = net->LabelNodes();
// BUGBUG: ^^ should not get all feature/label nodes, but only the ones referenced in a criterion
for (size_t pass = 0; pass < 2; pass++)
{
auto& nodes = (pass == 0) ? featureNodes : labelNodes;
for (const auto & node : nodes)
inputMatrices->AddInput(node->NodeName(), node->ValuePtr(), node->GetMBLayout(), node->GetSampleLayout());
}
// get hmm file for sequence training
bool isSequenceTrainingCriterion = (criterionNodes[0]->OperationName() == L"SequenceWithSoftmax");
if (isSequenceTrainingCriterion)
{
// SequenceWithSoftmaxNode<ElemType>* node = static_cast<SequenceWithSoftmaxNode<ElemType>*>(criterionNodes[0]);
auto node = dynamic_pointer_cast<SequenceWithSoftmaxNode<ElemType>>(criterionNodes[0]);
auto hmm = node->gethmm();
trainSetDataReader->GetHmmData(hmm);
}
// used for KLD regularized adaptation. For all other adaptation techniques
// use MEL to edit the model and using normal training algorithm
// TODO: Should this be done in SGD::Adapt()?
// TODO: Redo this leveraging that we now have shared_ptrs. It is probably even OK if both networks share feature nodes.
// TODO: Then we can also share the MBLayout; which currently is copied by value.
std::vector<ComputationNodeBasePtr> refFeatureNodes; // we keep the original network's features here
if (m_needAdaptRegularization && m_adaptationRegType == AdaptationRegType::KL && refNode != nullptr)
{
refNet->InvalidateCompiledNetwork(); // prepare to re-compile
// replace input nodes in ref network by input nodes of the main network
refFeatureNodes.resize(featureNodes.size());
for (size_t i = 0; i < featureNodes.size(); i++)
{
// we need to keep this info to undo this later
// TODO: After the change to shared_ptrs, this may no longer be necessary.
refFeatureNodes[i] = refNet->GetNodeFromName(featureNodes[i]->NodeName()); // remember so that we can restore them later
refNet->ReplaceNode(featureNodes[i]->NodeName(), featureNodes[i]);
}
//const_cast<MBLayoutPtr&>(refNet->GetMBLayoutPtrOfNetwork()) = net->GetMBLayoutPtrOfNetwork(); // WORKAROUND
refNet->CompileNetwork();
// allocate memory for forward computation
refNet->AllocateAllMatrices({refNode}, {}, nullptr);
}
// initializing weights and gradient holder
// only one criterion so far TODO: support multiple ones?
auto& learnableNodes = net->LearnableParameterNodes(criterionNodes[0]);
list<MatrixBasePtr> smoothedGradients;
vector<double> smoothedCounts; // currently used by FSAdaGradUpdate()
size_t numParameters = 0;
vector<wstring> nodesToUpdateDescriptions; // for logging only
for (auto nodeIter = learnableNodes.begin(); nodeIter != learnableNodes.end(); nodeIter++)
{
ComputationNodePtr node = dynamic_pointer_cast<ComputationNode<ElemType>>(*nodeIter);
// Note: We don't actually need the smoothedGradients if !IsParameterUpdateRequired().
// However, this is hard to fix since lots of code assumes smoothedGradients to be in the same order as learnableNodes.
// V2 API fixes this.
MatrixBasePtr smoothedGradientPtr;
size_t numRows = node->Value().GetNumRows();
size_t numCols = node->Value().GetNumCols();
if (std::is_same<ElemType, half>())
{
// For half parameters, we use float smoothed gradients
// Allocate 3 times the size for casting parameter and gradients to float
const size_t c_smoothed_gradients_factor = 3;
shared_ptr<Matrix<float>> compoundMatrixPtr = std::make_shared<Matrix<float>>(numRows,
numCols * c_smoothed_gradients_factor,
net->GetDeviceId());
// Initialize float parameters
auto parameterMatrix = compoundMatrixPtr->ColumnSlice(2 * numCols, numCols);
parameterMatrix.CastAssignValuesOf(node->Value());
smoothedGradientPtr = compoundMatrixPtr;
}
else
{
smoothedGradientPtr = std::make_shared<Matrix<ElemType>>(numRows,
numCols,
net->GetDeviceId());
}
smoothedGradients.push_back(smoothedGradientPtr);
smoothedCounts.push_back(0);
if (node->IsParameterUpdateRequired())
{
nodesToUpdateDescriptions.push_back(node->NodeDescription() + L" : [" + Microsoft::MSR::CNTK::ToFixedWStringFromMultiByte(string(node->GetSampleLayout())) + L"]");
numParameters += node->GetSampleLayout().GetNumElements();
}
}
size_t numNeedsGradient = 0;
for (let node : net->GetEvalOrder(criterionNodes[0]))
{
if (node->NeedsGradient())
numNeedsGradient++;
}
fprintf(stderr, "\n");
LOGPRINTF(stderr, "Training %.0f parameters in %d ",
(double)numParameters, (int)nodesToUpdateDescriptions.size());
if (m_traceLevel == 0)
fprintf(stderr, "parameter tensors.\n");
else
{
fprintf(stderr, "out of %d parameter tensors and %d nodes with gradient:\n\n",
(int)learnableNodes.size(), (int)numNeedsGradient);
for (let nodeDescription : nodesToUpdateDescriptions)
{
LOGPRINTF(stderr, "\t%ls\n", nodeDescription.c_str());
}
}
// one blank line before training progress log
fprintf(stderr, "\n");
double avgCriterion, lrControlCriterion;
lrControlCriterion = avgCriterion = numeric_limits<double>::infinity();
size_t epochsNotCountedInAvgCriterion = startEpoch % m_learnRateAdjustInterval;
std::vector<wstring> evalNodeNames;
for (size_t i = 0; i < evaluationNodes.size(); i++)
evalNodeNames.push_back(evaluationNodes[i]->NodeName());
double learnRatePerSample = 0.5f / m_mbSize[startEpoch];
double learningRateAdjustmentFactor = 1.0f;
vector<double> prevLearnRates;
prevLearnRates.resize(m_numPrevLearnRates);
for (int i = 0; i < m_numPrevLearnRates; i++)
prevLearnRates[i] = -1.0;
m_prevChosenMinibatchSize = m_mbSize[startEpoch];
int currentNumGradientBits = 0; // this remembers the last #gradient bits we set for dataParallelSGD (init val 0 has no meaning, just keep compiler happy)
if (GetParallelizationMethod() == ParallelizationMethod::dataParallelSGD)
{
currentNumGradientBits = m_numGradientBits[startEpoch]; // remember so that we can detect a change
InitDistGradAgg(evaluationNodes.size(), currentNumGradientBits, net->GetDeviceId(), m_traceLevel);
}
else if (GetParallelizationMethod() == ParallelizationMethod::modelAveragingSGD ||
GetParallelizationMethod() == ParallelizationMethod::blockMomentumSGD)
{
InitModelAggregationHandler(m_syncStatsTrace, net->GetDeviceId());
}
// precompute mean and invStdDev nodes and save initial model
// When no precompute, only save if we did not load the model from a
// checkpoint but instead built it from a network description
if (PreCompute(net, trainSetDataReader, featureNodes, labelNodes, inputMatrices) || !networkLoadedFromCheckpoint)
{
// Synchronize all ranks before writing the model to ensure that
// everyone is done loading the model
if (m_mpi != nullptr)
{
m_mpi->WaitAll();
}
// In case of parallel training only the main node should be saving the model to prevent
// the parallel training nodes from colliding to write the same file
if ((m_mpi == nullptr) || m_mpi->IsMainNode())
net->Save(GetModelNameForEpoch(int(startEpoch) - 1));
}
if (m_saveBestModelPerCriterion)
{
m_criteriaBestEpoch.clear();
if (!criterionNodes.empty())
{
m_criteriaBestEpoch.emplace(criterionNodes[0]->NodeName(), BestEpoch());
}
for (const ComputationNodeBasePtr& node : evaluationNodes)
{
m_criteriaBestEpoch.emplace(node->NodeName(), BestEpoch());
}
}
size_t totalTrainingSamplesSeen = 0; // aggregated over all epochs, for logging purposes only
size_t totalMBsSeen = 0;
bool learnRateInitialized = false;
double prevCriterion = numeric_limits<double>::infinity();
if (startEpoch > 0)
{
learnRateInitialized = TryLoadCheckPointInfo(startEpoch - 1,
/*out*/ totalTrainingSamplesSeen,
/*out*/ learnRatePerSample,
smoothedGradients,
smoothedCounts,
/*out*/ prevCriterion,
/*out*/ m_prevChosenMinibatchSize);
if (learnRateInitialized)
prevLearnRates[startEpoch % m_numPrevLearnRates] = learnRatePerSample;
}
if (m_autoLearnRateSearchType == LearningRateSearchAlgorithm::AdjustAfterEpoch &&
!learnRateInitialized && m_learningRatesParam.size() <= startEpoch)
{
InvalidArgument(
"When using \"AdjustAfterEpoch\", there must either exist a checkpoint file, "
"or an explicit learning rate must be specified in config for the starting epoch.");
}
// TODO this assumes training is picked up with nodes with zero parameters
double prevDropoutRate = 0;
double prevNormalizationTimeConstant = 0;
double prevNormalizationBlendTimeConstant = 0;
bool learnRateReduced = false;
// pass user config on memory allocation for convolution operations to the Network
ComputationNetwork::SetMaxTempMemSizeForCNN(net, criterionNodes[0], m_maxTempMemSizeInSamplesForCNN);
if (m_needAdaptRegularization && m_adaptationRegType == AdaptationRegType::KL && refNode)
{
ComputationNetwork::SetMaxTempMemSizeForCNN(refNet, refNode, m_maxTempMemSizeInSamplesForCNN);
}
// likewise for sequence training parameters
if (isSequenceTrainingCriterion)
{
ComputationNetwork::SetSeqParam<ElemType>(net, criterionNodes[0], m_hSmoothingWeight, m_frameDropThresh, m_doReferenceAlign,
m_seqGammarCalcAMF, m_seqGammarCalcLMF, m_seqGammarCalcWP, m_seqGammarCalcbMMIFactor, m_seqGammarCalcUsesMBR);
}
// Multiverso Warpper for ASGD logic init
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
{
m_pASGDHelper.reset(NewASGDHelper<ElemType>(learnableNodes,
m_mpi->NumNodesInUse(),
m_isAsyncBufferEnabled,
m_isSimulateMA,
m_adjustLearningRateAtBeginning,
m_adjustCoefficient,
m_adjustPerMinibatches,
m_traceLevel,
m_syncStatsTrace));
m_pASGDHelper->InitModel(learnableNodes);
}
// Create TensorBoard writer if needed. When using parallel training, make sure that only Rank 0 actually writes logs.
::CNTK::Internal::TensorBoardFileWriterPtr tensorBoardWriter;
if (!m_tensorBoardLogDir.empty() && (m_mpi == nullptr || m_mpi->CurrentNodeRank() == 0))
{
tensorBoardWriter = make_shared<::CNTK::Internal::TensorBoardFileWriter>(m_tensorBoardLogDir, net);
}
// --- MAIN EPOCH LOOP
for (int i = startEpoch; i < (int) m_maxEpochs; i++) // TODO: why is this an int, and not a size_t?
{
// Always skip the first epoch for profiling to avoid startup behavior.
// This has effect only if the profiler is globally enabled (profilerEnabled="true" in the config).
if (i > startEpoch)
{
ProfilerEnable(true);
}
// Synchronize all ranks before proceeding to ensure that
// rank 0 has finished writing the previous model file
SynchronizeWorkers();
// (re-)initialize 1-bit SGD
if (GetParallelizationMethod() == ParallelizationMethod::dataParallelSGD &&
currentNumGradientBits != m_numGradientBits[i])
{
currentNumGradientBits = m_numGradientBits[i];
InitDistGradAgg(evaluationNodes.size(), currentNumGradientBits, net->GetDeviceId(), m_traceLevel);
}
Timer timer;
timer.Start();
// set dropout rate for this epoch
// We use the same seed across workers until parallel training kicks in to ensure that the workers have identical models
size_t parallelWorkerIdx = ((m_mpi == nullptr) || !UsingParallelTrain(i)) ? 0 : m_mpi->CurrentNodeRank();
size_t randSeedBase = (parallelWorkerIdx * m_maxEpochs) + i;
ComputationNetwork::SetDropoutRate(net, criterionNodes[0], m_dropoutRates[i], prevDropoutRate);
ComputationNetwork::SetIRngUserSeed(net, criterionNodes[0], randSeedBase);
ComputationNetwork::SetBatchNormalizationTimeConstants<ElemType>(net, criterionNodes[0],
m_batchNormalizationTimeConstant[i], prevNormalizationTimeConstant,
m_batchNormalizationBlendTimeConstant[i], prevNormalizationBlendTimeConstant);
// learning rate adjustment
if (m_autoLearnRateSearchType == LearningRateSearchAlgorithm::None || i < m_learningRatesParam.size())
{
// BUGBUG: GetNumParallelSequences() returns 1 under certain situations; it seems when restarting from checkpoint
learnRatePerSample = GetLearningRatePerSample(i /*BUGBUG workaround:*/, trainSetDataReader->GetNumParallelSequencesForFixingBPTTMode());
}
else if (m_autoLearnRateSearchType == LearningRateSearchAlgorithm::SearchBeforeEpoch)
{
double largestPrevLearnRatePerSample = prevLearnRates[0];
for (int j = 1; j < m_numPrevLearnRates; j++)
{
largestPrevLearnRatePerSample = max(largestPrevLearnRatePerSample, prevLearnRates[j]);
}
// return a reasonable learning rate based on the initial minibatchSize
double newLearningRatePerSample = SearchForBestLearnRate(net, refNet, refNode, i, learnRatePerSample,
trainSetDataReader, featureNodes, labelNodes,
criterionNodes, evaluationNodes, inputMatrices,
learnableNodes, smoothedGradients, smoothedCounts,
learnRateInitialized, largestPrevLearnRatePerSample);
learningRateAdjustmentFactor = newLearningRatePerSample / learnRatePerSample;
learnRatePerSample = newLearningRatePerSample;
// save per sample learn rate to support changeable minibatchSize
prevLearnRates[i % m_numPrevLearnRates] = learnRatePerSample;
}
learnRateInitialized = true;
if (learnRatePerSample < m_minLearnRate)
{
LOGPRINTF(stderr, "Learn Rate Per Sample for Epoch[%d] = %.8g is less than minLearningRatePerSample %.8g. Training complete.\n",
i + 1, learnRatePerSample, m_minLearnRate);
if (m_autoLearnRateSearchType != LearningRateSearchAlgorithm::None)
{
// In case of parallel training only the main node should be saving the model to prevent
// the parallel training nodes from colliding to write the same file
if ((m_mpi == nullptr) || m_mpi->IsMainNode())
net->Save(m_modelPath);
}
break;
}
size_t chosenMinibatchSize;
size_t actualMinibatchSize;
// Through the command line or config file the user can set minibatch sizes on a per epoch
// basis for a set number of epochs. For epochs after that point, m_mbSize.size(), either
// we just keep using
// the last minibatch size, or we use tuning to try and find a better one.
if (m_autoAdjustMinibatch && i >= m_mbSize.size())
{
size_t numFramesToUseInSearch = m_numSamples4Search[i];
if (m_epochSize != requestDataSize)
{
// ensure the numFramesToUseInSearch does not exceed the total number of frames in the epoch
numFramesToUseInSearch = min(numFramesToUseInSearch, m_epochSize);
}
// Use tuning to try and find a better minibatch size
chosenMinibatchSize = AdaptiveMinibatchSizing(net, refNet, refNode, i,
numFramesToUseInSearch,
trainSetDataReader, learnRatePerSample,
m_mbSize[i], featureNodes, labelNodes,
criterionNodes, evaluationNodes,
inputMatrices, learnableNodes,
smoothedGradients, smoothedCounts, learningRateAdjustmentFactor);
if (m_traceLevel < 1 && chosenMinibatchSize != m_prevChosenMinibatchSize)
LOGPRINTF(stderr, "Minibatch size adapted to %d.\n", (int)chosenMinibatchSize);
m_prevChosenMinibatchSize = chosenMinibatchSize;
}
else
{
// use the explicitly set minibatch size
chosenMinibatchSize = m_mbSize[i];
}
// For legacy readers, in BPTT mode the minibatch size was not the real minibatch size but truncation.
// Because of that we have to fix up the real minibatch size multiplying the number of parallel sequences by the truncation length.
// This is not required any more for the new readers.
if (trainSetDataReader->IsLegacyReader())
actualMinibatchSize = FixUpEffectiveMBSize(chosenMinibatchSize /*BUGBUG workaround:*/, trainSetDataReader->GetNumParallelSequencesForFixingBPTTMode());
else
actualMinibatchSize = chosenMinibatchSize;
double momentumPerSample = GetMomentumPerSample(i /*BUGBUG workaround:*/, trainSetDataReader->GetNumParallelSequencesForFixingBPTTMode());
// time constant = number of samples after which a contribution has been reduced to e^-1
double momentumAsTimeConstant = momentumPerSample == 0.0
? 0.0
: momentumPerSample >= 1.0
? 0.0
: -1.0 / log(momentumPerSample);
if (m_traceLevel > 0)
{
fprintf(stderr, "\n");
LOGPRINTF(stderr, "Starting Epoch %d: learning rate per sample = %f effective momentum = %f momentum as time constant = %.1f samples\n",
i + 1, learnRatePerSample, MomentumPerMB(momentumPerSample, actualMinibatchSize), momentumAsTimeConstant);
}
EpochCriterion epochCriterion; // criterion values are returned in this
std::vector<EpochCriterion> epochEvalErrors(evaluationNodes.size());
totalMBsSeen += TrainOneEpoch(net,
refNet,
refNode,
i,
m_epochSize,
trainSetDataReader,
learnRatePerSample,
chosenMinibatchSize,
featureNodes,
labelNodes,
criterionNodes,
evaluationNodes,
inputMatrices,
learnableNodes, smoothedGradients, smoothedCounts,
epochCriterion, epochEvalErrors,
"", SIZE_MAX, totalMBsSeen, tensorBoardWriter, startEpoch);
totalTrainingSamplesSeen += epochCriterion.second; // aggregate #training samples, for logging purposes only
timer.Stop();
double epochTime = timer.ElapsedSeconds();
if (m_useEvalCriterionControlLR && epochEvalErrors.size() > 0)
lrControlCriterion = epochEvalErrors[0].Average();
else
lrControlCriterion = epochCriterion.Average();
LOGPRINTF(stderr, "Finished Epoch[%2d of %d]: [Training] ", i + 1, (int)m_maxEpochs);
epochCriterion.LogCriterion(criterionNodes[0]->NodeName());
m_lastFinishedEpochTrainLoss = epochCriterion.Average();
for (size_t j = 0; j < epochEvalErrors.size(); j++)
epochEvalErrors[j].LogCriterion(evaluationNodes[j]->NodeName());
fprintf(stderr, "totalSamplesSeen = %zu; learningRatePerSample = %.8g; epochTime=%.6gs\n", totalTrainingSamplesSeen, learnRatePerSample, epochTime);
#if 0
// TODO: This was only printed if >1 eval criterion. Why? Needed?
LOGPRINTF(stderr, "Finished Epoch[%2d of %d]: Criterion Node [%ls] Per Sample = %.8g\n",
i + 1, (int)m_maxEpochs, criterionNodes[0]->NodeName().c_str(), epochCriterion.Average());
for (size_t j = 0; j < epochEvalErrors.size(); j++)
{
LOGPRINTF(stderr, "Finished Epoch[%2d of %d]: Evaluation Node [%ls] Per Sample = %.8g\n",
i + 1, (int) m_maxEpochs, evalNodeNames[j].c_str(), epochEvalErrors[j].Average());
}
#endif
if (tensorBoardWriter)
{
tensorBoardWriter->WriteValue(L"summary/" + criterionNodes[0]->NodeName(), (float)epochCriterion.Average(), i + 1);
for (size_t j = 0; j < epochEvalErrors.size(); j++)
{
tensorBoardWriter->WriteValue(L"summary/" + evaluationNodes[0]->NodeName(), (float)epochEvalErrors[j].Average(), i + 1);
}
tensorBoardWriter->Flush();
}
if (validationSetDataReader != trainSetDataReader && validationSetDataReader != nullptr)
{
// TODO(dataASGD) making evaluator becoming nondistributed one when using ASGD, since Multiverso has another background thread using MPI.
// Making the evaluation serial (non-distributed) will slowdown training especially when validation set is large.
SimpleEvaluator<ElemType> evalforvalidation(net, UsingAsyncGradientAggregation(i + 1) ?nullptr : m_mpi, m_enableDistributedMBReading);
vector<wstring> cvSetTrainAndEvalNodes;
if (criterionNodes.size() > 0)
{
cvSetTrainAndEvalNodes.push_back(criterionNodes[0]->NodeName());
}
for (let node : evaluationNodes)
{
cvSetTrainAndEvalNodes.push_back(node->NodeName());
}
// BUGBUG: We should not use the training MB size. The training MB size is constrained by both convergence and memory. Eval is only constrained by memory.
let vScore = evalforvalidation.Evaluate(validationSetDataReader, cvSetTrainAndEvalNodes, UsingAsyncGradientAggregation(i + 1) ? m_mbSize[i] / m_mpi->NumNodesInUse() : m_mbSize[i]);
LOGPRINTF(stderr, "Finished Epoch[%2d of %d]: [Validate] ", i + 1, (int)m_maxEpochs);
for (size_t k = 0; k < vScore.size() /*&& k < 2*/; k++)
vScore[k].LogCriterion(cvSetTrainAndEvalNodes[k], /*addSemicolon=*/k + 1 < vScore.size());
//fprintf(stderr, "%s %ls = %.8f * %d", k ? ";" : "", cvSetTrainAndEvalNodes[k].c_str(), vScore[k].Average(), (int)vScore[k].second);
fprintf(stderr, "\n");
if (tensorBoardWriter)
{
for (size_t k = 0; k < vScore.size(); k++)
{
tensorBoardWriter->WriteValue(L"summary/test_" + cvSetTrainAndEvalNodes[k], (float)vScore[k].Average(), i + 1);
}
tensorBoardWriter->Flush();
}
if (m_saveBestModelPerCriterion)
{
// Loops through criteria (i.e. score) and updates the best one if smaller value is found.
UpdateBestEpochs(vScore, cvSetTrainAndEvalNodes, i, m_criteriaBestEpoch);
}
if (m_useCVSetControlLRIfCVExists)
{
if (m_useEvalCriterionControlLR && vScore.size() > 1)
lrControlCriterion = vScore[1].Average(); // use the first of possibly multiple eval criteria
else
lrControlCriterion = vScore[0].Average(); // the first one is the training criterion
}
}
// broadcast epochCriterion to make sure each processor will have the same learning rate schedule
if ((GetParallelizationMethod() == ParallelizationMethod::modelAveragingSGD
||
GetParallelizationMethod() == ParallelizationMethod::blockMomentumSGD)
&& (m_mpi->NumNodesInUse() > 1))
{
m_mpi->Bcast(&epochCriterion.first, 1, m_mpi->MainNodeRank());
m_mpi->Bcast(&epochCriterion.second, 1, m_mpi->MainNodeRank());
m_mpi->Bcast(&lrControlCriterion, 1, m_mpi->MainNodeRank());
}
bool loadedPrevModel = false;
size_t epochsSinceLastLearnRateAdjust = i % m_learnRateAdjustInterval + 1;
if (avgCriterion == numeric_limits<double>::infinity())
{
avgCriterion = lrControlCriterion;
}
else
{
avgCriterion = ((epochsSinceLastLearnRateAdjust - 1 - epochsNotCountedInAvgCriterion) *
avgCriterion +
lrControlCriterion) /
(epochsSinceLastLearnRateAdjust - epochsNotCountedInAvgCriterion);
}
if (m_autoLearnRateSearchType == LearningRateSearchAlgorithm::AdjustAfterEpoch &&
m_learningRatesParam.size() <= i && epochsSinceLastLearnRateAdjust == m_learnRateAdjustInterval)
{
if (std::isnan(avgCriterion) || (prevCriterion - avgCriterion < 0 && prevCriterion != numeric_limits<double>::infinity()))
{
if (m_loadBestModel)
{
// roll back
auto bestModelPath = GetModelNameForEpoch(i - m_learnRateAdjustInterval);
LOGPRINTF(stderr, "Loading (rolling back to) previous model with best training-criterion value: %ls.\n", bestModelPath.c_str());
net->RereadPersistableParameters<ElemType>(bestModelPath);
LoadCheckPointInfo(i - m_learnRateAdjustInterval,
/*out*/ totalTrainingSamplesSeen,
/*out*/ learnRatePerSample,
smoothedGradients,
smoothedCounts,
/*out*/ prevCriterion,
/*out*/ m_prevChosenMinibatchSize);
loadedPrevModel = true;
}
}
if (m_continueReduce)
{
if (std::isnan(avgCriterion) ||
(prevCriterion - avgCriterion <= m_reduceLearnRateIfImproveLessThan * prevCriterion &&
prevCriterion != numeric_limits<double>::infinity()))
{
if (learnRateReduced == false)
{
learnRateReduced = true;
}
else
{
// In case of parallel training only the main node should be saving the model to prevent
// the parallel training nodes from colliding to write the same file
if ((m_mpi == nullptr) || m_mpi->IsMainNode())
net->Save(GetModelNameForEpoch(i, true));
LOGPRINTF(stderr, "Finished training and saved final model\n\n");
break;
}
}
if (learnRateReduced)
{
learnRatePerSample *= m_learnRateDecreaseFactor;
LOGPRINTF(stderr, "learnRatePerSample reduced to %.8g\n", learnRatePerSample);
}
}
else
{
if (std::isnan(avgCriterion) ||
(prevCriterion - avgCriterion <= m_reduceLearnRateIfImproveLessThan * prevCriterion &&
prevCriterion != numeric_limits<double>::infinity()))
{
learnRatePerSample *= m_learnRateDecreaseFactor;
LOGPRINTF(stderr, "learnRatePerSample reduced to %.8g\n", learnRatePerSample);
}
else if (prevCriterion - avgCriterion > m_increaseLearnRateIfImproveMoreThan * prevCriterion &&
prevCriterion != numeric_limits<double>::infinity())
{
learnRatePerSample *= m_learnRateIncreaseFactor;
LOGPRINTF(stderr, "learnRatePerSample increased to %.8g\n", learnRatePerSample);
}
}
}
else
{
if (std::isnan(avgCriterion))
RuntimeError("The training criterion is not a number (NAN).");
}
// not loading previous values then set them
if (!loadedPrevModel && epochsSinceLastLearnRateAdjust == m_learnRateAdjustInterval)
{
prevCriterion = avgCriterion;
epochsNotCountedInAvgCriterion = 0;
}
// Synchronize all ranks before proceeding to ensure that
// nobody tries reading the checkpoint file at the same time
// as rank 0 deleting it below
SynchronizeWorkers();
// Persist model and check-point info
if ((m_mpi == nullptr) || m_mpi->IsMainNode())
{
if (loadedPrevModel)
{
// If previous best model is loaded, we will first remove epochs that lead to worse results
for (int j = 1; j < m_learnRateAdjustInterval; j++)
{
int epochToDelete = i - j;
LOGPRINTF(stderr, "SGD: removing model and checkpoint files for epoch %d after rollback to epoch %lu\n", epochToDelete + 1, (unsigned long)(i - m_learnRateAdjustInterval) + 1); // report 1 based epoch number
_wunlink(GetModelNameForEpoch(epochToDelete).c_str());
_wunlink(GetCheckPointFileNameForEpoch(epochToDelete).c_str());
}
// Set i back to the loaded model
i -= m_learnRateAdjustInterval;
LOGPRINTF(stderr, "SGD: revoke back to and update checkpoint file for epoch %d\n", i+1); // report 1 based epoch number
SaveCheckPointInfo(
i,
totalTrainingSamplesSeen,
learnRatePerSample,
smoothedGradients,
smoothedCounts,
prevCriterion,
chosenMinibatchSize);
}
else
{
SaveCheckPointInfo(
i,
totalTrainingSamplesSeen,
learnRatePerSample,
smoothedGradients,
smoothedCounts,
prevCriterion,
chosenMinibatchSize);
auto modelName = GetModelNameForEpoch(i);
if (m_traceLevel > 0)
LOGPRINTF(stderr, "SGD: Saving checkpoint model '%ls'\n", modelName.c_str());
net->Save(modelName);
if (!m_keepCheckPointFiles)
{
// delete previous checkpoint file to save space
if (m_autoLearnRateSearchType == LearningRateSearchAlgorithm::AdjustAfterEpoch && m_loadBestModel)
{
if (epochsSinceLastLearnRateAdjust != 1)
{
_wunlink(GetCheckPointFileNameForEpoch(i - 1).c_str());
}
if (epochsSinceLastLearnRateAdjust == m_learnRateAdjustInterval)
{
_wunlink(GetCheckPointFileNameForEpoch(i - m_learnRateAdjustInterval).c_str());
}
}
else
{
_wunlink(GetCheckPointFileNameForEpoch(i - 1).c_str());
}
}
}
}
else
{
if (loadedPrevModel)
{
// Set i back to the loaded model
i -= m_learnRateAdjustInterval;
}
}
if (learnRatePerSample < 1e-12)
{
LOGPRINTF(stderr, "learnRate per sample is reduced to %.8g which is below 1e-12. stop training.\n",
learnRatePerSample);
}
}
// --- END OF MAIN EPOCH LOOP
// Check if we need to save best model per criterion and this is the main node as well.
if (m_saveBestModelPerCriterion && ((m_mpi == nullptr) || m_mpi->IsMainNode()))
{
// For each criterion copies the best epoch to the new file with criterion name appended.
CopyBestEpochs(m_criteriaBestEpoch, *this, m_maxEpochs - 1);
}
// Synchronize all ranks before proceeding to ensure that
// rank 0 has finished writing the model file
// TODO[DataASGD]: should othet other rank waiting in async-mode
SynchronizeWorkers();
// progress tracing for compute cluster management
ProgressTracing::TraceProgressPercentage(m_maxEpochs, 0.0, true);
ProgressTracing::TraceTrainLoss(m_lastFinishedEpochTrainLoss);
// since we linked feature nodes. we need to remove it from the deletion
if (m_needAdaptRegularization && m_adaptationRegType == AdaptationRegType::KL && refNode != nullptr)
{
for (size_t i = 0; i < refFeatureNodes.size(); i++)
{
// note we need to handle deletion carefully
refNet->ReplaceNode(refFeatureNodes[i]->NodeName(), refFeatureNodes[i]);
}
}
delete inputMatrices;
if (m_parallelizationMethod == ParallelizationMethod::dataParallelASGD)
m_pASGDHelper.reset();
}