in AmbrosiaLib/Ambrosia/Program.cs [2714:3051]
private async Task ReplayAsync(ILogReader replayStream, MachineState state)
{
var tempBuf = new byte[100];
var tempBuf2 = new byte[100];
var headerBuf = new byte[Committer.HeaderSize];
var headerBufStream = new MemoryStream(headerBuf);
var committedInputDict = new Dictionary<string, LongPair>();
var trimDict = new Dictionary<string, long>();
var detectedEOF = false;
var detectedEOL = false;
var clearedCommitterWrite = false;
var haveWriterLockForNonActiveActive = false;
ILogWriter lastLogFileStreamWriter = null;
// Keep replaying commits until we run out of replay data
while (true)
{
long logRecordPos = replayStream.Position;
int commitSize;
try
{
var usingxxHash64 = false;
// First get commit ID and check for integrity
replayStream.ReadAllRequiredBytes(headerBuf, 0, Committer.HeaderSize);
headerBufStream.Position = 0;
var commitID = headerBufStream.ReadIntFixed();
if (commitID != state.Committer.CommitID)
{
throw new Exception("Committer didn't match. Must be incomplete record");
}
// Get commit page length
commitSize = headerBufStream.ReadIntFixed();
// Hack to make sure the right hash function is used by using the negative length values to specify the new hash
if (commitSize < 0)
{
usingxxHash64 = true;
commitSize = (commitSize * -1) - 1;
headerBufStream.Position -= 4;
headerBufStream.WriteIntFixed(commitSize);
}
var checkBytes = headerBufStream.ReadULongFixed();
var writeSeqID = headerBufStream.ReadLongFixed();
if (writeSeqID != state.Committer._nextWriteID)
{
throw new Exception("Out of order page. Must be incomplete record");
}
// Remove header
commitSize -= Committer.HeaderSize;
if (commitSize > tempBuf.Length)
{
tempBuf = new byte[commitSize];
}
replayStream.ReadAllRequiredBytes(tempBuf, 0, commitSize);
// Perform integrity check
ulong checkBytesCalc;
if (usingxxHash64)
{
checkBytesCalc = state.Committer.CheckBytesxxHash64(tempBuf, 0, commitSize);
}
else
{
checkBytesCalc = state.Committer.CheckBytesOriginal(tempBuf, 0, commitSize);
}
if (checkBytesCalc != checkBytes)
{
throw new Exception("Integrity check failed for page. Must be incomplete record");
}
// Read changes in input consumption progress to reflect in _inputs
var watermarksToRead = replayStream.ReadInt();
committedInputDict.Clear();
for (int i = 0; i < watermarksToRead; i++)
{
var inputNameSize = replayStream.ReadInt();
if (inputNameSize > tempBuf2.Length)
{
tempBuf2 = new byte[inputNameSize];
}
replayStream.ReadAllRequiredBytes(tempBuf2, 0, inputNameSize);
var inputName = Encoding.UTF8.GetString(tempBuf2, 0, inputNameSize);
var newLongPair = new LongPair();
newLongPair.First = replayStream.ReadLongFixed();
newLongPair.Second = replayStream.ReadLongFixed();
committedInputDict[inputName] = newLongPair;
}
// Read changes in trim to perform and reflect in _outputs
watermarksToRead = replayStream.ReadInt();
trimDict.Clear();
for (int i = 0; i < watermarksToRead; i++)
{
var inputNameSize = replayStream.ReadInt();
if (inputNameSize > tempBuf2.Length)
{
tempBuf2 = new byte[inputNameSize];
}
replayStream.ReadAllRequiredBytes(tempBuf2, 0, inputNameSize);
var inputName = Encoding.UTF8.GetString(tempBuf2, 0, inputNameSize);
long seqNo = replayStream.ReadLongFixed();
trimDict[inputName] = seqNo;
}
}
catch
{
// Non-Active/Active case for couldn't recover replay segment. Could be for a number of reasons.
// Do we already have the write lock on the latest log?
if (!_activeActive)
{
// Since it's not the active/active case, take over (migrations scenario using the kill file, or just recover)
// But first, make sure we have fully consumed the log (except a bit at the end)
var actualLastLogFileNum = long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID)));
if (!_logWriterStatics.FileExists(LogFileName(actualLastLogFileNum, state.ShardID)))
{
OnError(MissingLog, "Missing log in replay or update happened" + state.LastLogFile.ToString());
}
if (actualLastLogFileNum > state.LastLogFile) // there are more log files to read. Move on.
{
state.LastLogFile++;
replayStream.Dispose();
replayStream = LogReaderStaticPicker.curStatic.Generate(LogFileName(state.LastLogFile, state.ShardID));
continue;
}
if (!haveWriterLockForNonActiveActive)
{
// We're as close to the end of the log as we can get. We need to grab and hold the lock on the kill file.
while (true)
{
Thread.Sleep(200);
try
{
LockKillFile();
// We have the lock!
break;
}
catch (Exception)
{
// Keep trying until successful
}
}
// keep trying to take the write permission on LOG file until the old execution instance dies and lets go
while (true)
{
try
{
actualLastLogFileNum = long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID)));
if (!_logWriterStatics.FileExists(LogFileName(actualLastLogFileNum, state.ShardID)))
{
OnError(MissingLog, "Missing log in replay or update happened" + state.LastLogFile.ToString());
}
Debug.Assert(lastLogFileStreamWriter == null);
// See if we've successfully killed the old instance execution
lastLogFileStreamWriter = _logWriterStatics.Generate(LogFileName(actualLastLogFileNum, state.ShardID), 1024 * 1024, 6, true);
if (long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID))) != actualLastLogFileNum)
{
// We got an old log. Try again
throw new Exception();
}
// The old instance execution died. We need to finish recovery, then exit!
break;
}
catch
{
if (lastLogFileStreamWriter != null)
{
lastLogFileStreamWriter.Dispose();
lastLogFileStreamWriter = null;
}
await Task.Delay(200);
}
}
// We've locked the log. There may be more log to consume. Continue until we hit the true end.
haveWriterLockForNonActiveActive = true;
replayStream.Position = logRecordPos;
continue;
}
else
{
// We've consumed the whole log and have all the necessary locks.
await state.Committer.SleepAsync();
state.Committer.SwitchLogStreams(lastLogFileStreamWriter);
await state.Committer.WakeupAsync();
Debug.Assert(_killFileHandle != null);
ReleaseAndTryCleanupKillFile();
break;
}
}
// Active/Active case for couldn't recover replay segment. Could be for a number of reasons.
if (detectedEOL)
{
break;
}
if (detectedEOF)
{
// Move to the next log file for reading only. We may need to take a checkpoint
state.LastLogFile++;
replayStream.Dispose();
if (!_logWriterStatics.FileExists(LogFileName(state.LastLogFile, state.ShardID)))
{
OnError(MissingLog, "Missing log in replay " + state.LastLogFile.ToString());
}
replayStream = LogReaderStaticPicker.curStatic.Generate(LogFileName(state.LastLogFile, state.ShardID));
if (state.MyRole == AARole.Checkpointer)
{
// take the checkpoint associated with the beginning of the new log
// It's currently too disruptive to the code to pass in MachineState to
// CheckpointAsync, so we update the corresponding variables instead.
// This should be fine since the checkpointer should not replay from
// multiple logs in parallel.
UpdateAmbrosiaState(state);
_committer.SleepAsync();
_committer.QuiesceServiceWithSendCheckpointRequest();
await CheckpointAsync();
await _committer.WakeupAsync();
LoadAmbrosiaState(state);
}
detectedEOF = false;
continue;
}
var myRoleBeforeEOLChecking = state.MyRole;
replayStream.Position = logRecordPos;
var newLastLogFile = state.LastLogFile;
if (_runningRepro)
{
if (_logWriterStatics.FileExists(LogFileName(state.LastLogFile + 1, state.ShardID)))
{
// If there is a next file, then move to it
newLastLogFile = state.LastLogFile + 1;
}
}
else
{
newLastLogFile = long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID)));
}
if (newLastLogFile > state.LastLogFile) // a new log file has been written
{
// Someone started a new log. Try to read the last record again and then move to next file
detectedEOF = true;
continue;
}
if (myRoleBeforeEOLChecking == AARole.Primary)
{
// Became the primary and the current file is the end of the log. Make sure we read the whole file.
detectedEOL = true;
continue;
}
// The remaining case is that we hit the end of log, but someone is still writing to this file. Wait and try to read again, or kill the primary if we are trying to upgrade in an active/active scenario
if (_upgrading && _activeActive && _killFileHandle == null)
{
// We need to write and hold the lock on the kill file. Recovery will continue until the primary dies and we have
// fully processed the log.
while (true)
{
try
{
LockKillFile();
break;
}
catch (Exception)
{
// Someone may be checking promotability. Keep trying until successful
}
}
}
await Task.Delay(1000);
continue;
}
// Successfully read an entire replay segment. Go ahead and process for recovery
foreach (var kv in committedInputDict)
{
InputConnectionRecord inputConnectionRecord;
if (!state.Inputs.TryGetValue(kv.Key, out inputConnectionRecord))
{
// Create input record and add it to the dictionary
inputConnectionRecord = new InputConnectionRecord();
state.Inputs[kv.Key] = inputConnectionRecord;
}
inputConnectionRecord.LastProcessedID = kv.Value.First;
inputConnectionRecord.LastProcessedReplayableID = kv.Value.Second;
OutputConnectionRecord outputConnectionRecord;
// this lock prevents conflict with output arriving from the local service during replay
lock (state.Outputs)
{
if (!state.Outputs.TryGetValue(kv.Key, out outputConnectionRecord))
{
outputConnectionRecord = new OutputConnectionRecord(this);
state.Outputs[kv.Key] = outputConnectionRecord;
}
}
// this lock prevents conflict with output arriving from the local service during replay and ensures maximal cleaning
lock (outputConnectionRecord)
{
outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim);
outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable);
if (outputConnectionRecord.ControlWorkQ.IsEmpty)
{
outputConnectionRecord.ControlWorkQ.Enqueue(-2);
}
}
}
// Do the actual work on the local service
_localServiceSendToStream.Write(headerBuf, 0, Committer.HeaderSize);
_localServiceSendToStream.Write(tempBuf, 0, commitSize);
// Trim the outputs. Should clean as aggressively as during normal operation
foreach (var kv in trimDict)
{
OutputConnectionRecord outputConnectionRecord;
// this lock prevents conflict with output arriving from the local service during replay
lock (state.Outputs)
{
if (!state.Outputs.TryGetValue(kv.Key, out outputConnectionRecord))
{
outputConnectionRecord = new OutputConnectionRecord(this);
state.Outputs[kv.Key] = outputConnectionRecord;
}
}
// this lock prevents conflict with output arriving from the local service during replay and ensures maximal cleaning
lock (outputConnectionRecord)
{
outputConnectionRecord.TrimTo = kv.Value;
outputConnectionRecord.ReplayableTrimTo = kv.Value;
outputConnectionRecord.BufferedOutput.Trim(kv.Value, ref outputConnectionRecord.placeInOutput);
}
}
// If this is the first replay segment, it invalidates the contents of the committer, which must be cleared.
if (!clearedCommitterWrite)
{
state.Committer.ClearNextWrite();
clearedCommitterWrite = true;
}
// bump up the write ID in the committer in preparation for reading or writing the next page
state.Committer._nextWriteID++;
}
}