in src/prod/src/data/logicallog/LogTestBase.cpp [1544:2559]
VOID LogTestBase::ReadAheadCacheTest(__in KtlLoggerMode)
{
NTSTATUS status;
ILogManagerHandle::SPtr logManager;
status = CreateAndOpenLogManager(logManager);
VERIFY_STATUS_SUCCESS("CreateAndOpenLogManager", status);
LogManager* logManagerService = nullptr;
{
LogManagerHandle* logManagerServiceHandle = dynamic_cast<LogManagerHandle*>(logManager.RawPtr());
if (logManagerServiceHandle != nullptr)
{
logManagerService = logManagerServiceHandle->owner_.RawPtr();
}
}
VerifyState(
logManagerService,
1,
0,
TRUE);
KString::SPtr physicalLogName;
GenerateUniqueFilename(physicalLogName);
KGuid physicalLogId;
physicalLogId.CreateNew();
// Don't care if this fails
SyncAwait(logManager->DeletePhysicalLogAsync(*physicalLogName, physicalLogId, CancellationToken::None));
IPhysicalLogHandle::SPtr physicalLog;
status = CreatePhysicalLog(*logManager, physicalLogId, *physicalLogName, physicalLog);
VERIFY_STATUS_SUCCESS("CreatePhysicalLog", status);
VerifyState(
logManagerService,
1,
1,
TRUE,
dynamic_cast<PhysicalLogHandle*>(physicalLog.RawPtr()) != nullptr ? &(static_cast<PhysicalLogHandle&>(*physicalLog).Owner) : nullptr,
1,
0,
TRUE,
dynamic_cast<PhysicalLogHandle*>(physicalLog.RawPtr()),
TRUE);
KString::SPtr logicalLogName;
GenerateUniqueFilename(logicalLogName);
KGuid logicalLogId;
logicalLogId.CreateNew();
ILogicalLog::SPtr logicalLog;
status = CreateLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("CreateLogicalLog", status);
VerifyState(
logManagerService,
1,
1,
TRUE,
dynamic_cast<PhysicalLogHandle*>(physicalLog.RawPtr()) != nullptr ? &(static_cast<PhysicalLogHandle&>(*physicalLog).Owner) : nullptr,
1,
1,
TRUE,
dynamic_cast<PhysicalLogHandle*>(physicalLog.RawPtr()),
TRUE,
nullptr,
-1,
-1,
FALSE,
nullptr,
FALSE,
FALSE,
dynamic_cast<LogicalLog*>(logicalLog.RawPtr()),
TRUE);
VERIFY_ARE_EQUAL(0, logicalLog->Length);
VERIFY_ARE_EQUAL(0, logicalLog->ReadPosition);
VERIFY_ARE_EQUAL(0, logicalLog->WritePosition);
VERIFY_ARE_EQUAL(-1, logicalLog->HeadTruncationPosition);
static const LONG MaxLogBlkSize = 1024 * 1024;
//
// Write a nice pattern in the log which would be easy to validate the data
//
LONG dataSize = 16 * 1024 * 1024; // 16MB
LONG chunkSize = 1024;
LONG prefetchSize = 1024 * 1024;
LONG loops = dataSize / chunkSize;
KBuffer::SPtr bufferBigK, buffer1K;
PUCHAR bufferBig, buffer1;
AllocBuffer(2 * prefetchSize, bufferBigK, bufferBig);
AllocBuffer(chunkSize, buffer1K, buffer1);
for (LONG i = 0; i < loops; i++)
{
LONGLONG pos = i * chunkSize;
BuildDataBuffer(*buffer1K, pos);
//
// TODO: Fix buffer1K to be correct size. Add 1 to work around bug in KFileStream::WriteAsync
// where a full KBuffer cannot be written
//
status = SyncAwait(logicalLog->AppendAsync(*buffer1K, 0, buffer1K->QuerySize(), CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::AppendAsync", status);
}
status = SyncAwait(logicalLog->FlushWithMarkerAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::FlushWithMarkerAsync", status);
//
// Verify length
//
VERIFY_IS_TRUE(logicalLog->GetLength() == dataSize);
VERIFY_IS_TRUE(logicalLog->GetWritePosition() == dataSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
//
// Reopen to verify length
//
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
//
// Verify length
//
VERIFY_IS_TRUE(logicalLog->GetLength() == dataSize);
VERIFY_IS_TRUE(logicalLog->GetWritePosition() == dataSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
//
// Test 1: Sequentially read through file and verify that data is correct
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 1: Sequentially read through file and verify that data is correct");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_STATUS_SUCCESS("LogicalLog::CreateReadStream", status);
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
for (LONG i = 0; i < loops; i++)
{
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer1K, read, 0, chunkSize));
VERIFY_STATUS_SUCCESS("LogicalLogReadStream::ReadAsync", status);
LONGLONG pos = i * chunkSize;
ValidateDataBuffer(*buffer1K, read, 0, pos);
}
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 2: Read at the end of the file to trigger a prefetch read beyond the end of the file is where the
// last prefetch isn't a full record.
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 2: Read at the end of the file to trigger a prefetch read beyond the end of the file is where the last prefetch isn't a full record.");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
LONGLONG pos = dataSize - (prefetchSize + (prefetchSize / 2));
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer1K, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*buffer1K, read, 0, pos);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 3: Read at the end of the file to trigger a prefetch read that is exactly the end of the file.
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 3: Read at the end of the file to trigger a prefetch read that is exactly the end of the file.");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
LONGLONG pos = dataSize - 2 * prefetchSize;
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer1K, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*buffer1K, read, 0, pos);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 4: Read at end of file where read isn't a full record and prefetch read is out of bounds.
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 4: Read at end of file where read isn't a full record and prefetch read is out of bounds.");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
LONGLONG pos = dataSize - (prefetchSize / 2);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer1K, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*buffer1K, read, 0, pos);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 5: Read beyond end of file.
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 5: Read beyond end of file.");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
LONGLONG pos = dataSize - 1;
stream->SetPosition(pos /*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer1K, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*buffer1K, read, 0, pos);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 6: Random access reads of less than a prefetch record size and verify that data is correct
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 6: Random access reads of less than a prefetch record size and verify that data is correct");
Common::Random random((int)GetTickCount64());
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
// TODO: replace 32 with 1024. Workaround for Rich's bug
for (LONG i = 0; i < 32; i++)
{
ULONG len = random.Next() % (prefetchSize - 1);
KBuffer::SPtr buffer;
PUCHAR b;
AllocBuffer(len, buffer, b);
//
// Pick a random place but do not go beyond end of file
// or into the space already truncated head
//
LONGLONG r = random.Next();
LONGLONG m = (dataSize - (prefetchSize + 4096 + len));
LONGLONG pos = (r % m);
pos = pos + (prefetchSize + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
// TODO: Remove this when I feel good about the math
CODING_ERROR_ASSERT(pos >= (prefetchSize + 4096));
TestCommon::TestSession::WriteInfo(TraceComponent, "pos: {0}, len: {1}, dataSize {2}", pos, len, dataSize);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer, read, 0, len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
CODING_ERROR_ASSERT(len == read);
VERIFY_IS_TRUE(len == read);
ValidateDataBuffer(*buffer, read, 0, pos);
}
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
logicalLog = nullptr;
}
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 6a: Specific read pattern");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
LONG count = 56;
KArray<LONG> pos1(GetThisAllocator(), count, count);
KArray<LONG> len1(GetThisAllocator(), count, count);
pos1.SetCount(count);
len1.SetCount(count);
pos1[0] = 6643136;
len1[0] = 284158;
pos1[1] = 8749250;
len1[1] = 719631;
pos1[2] = 7968798;
len1[2] = 217852;
pos1[3] = 5589085;
len1[3] = 37828;
pos1[4] = 8951191;
len1[4] = 658471;
pos1[5] = 2036603;
len1[5] = 779626;
pos1[6] = 13515405;
len1[6] = 753509;
pos1[7] = 2026546;
len1[7] = 127354;
pos1[8] = 12065124;
len1[8] = 485889;
pos1[9] = 2922527;
len1[9] = 748745;
pos1[10] = 12362362;
len1[10] = 324286;
pos1[11] = 15079374;
len1[11] = 411635;
pos1[12] = 308588;
len1[12] = 515382;
pos1[13] = 7394979;
len1[13] = 535056;
pos1[14] = 9878541;
len1[14] = 445104;
pos1[15] = 6615835;
len1[15] = 924314;
pos1[16] = 11835666;
len1[16] = 259902;
pos1[17] = 8862263;
len1[17] = 949164;
pos1[18] = 5386414;
len1[18] = 517708;
pos1[19] = 11259477;
len1[19] = 675057;
pos1[20] = 14395727;
len1[20] = 973734;
pos1[21] = 9391625;
len1[21] = 1021568;
pos1[22] = 298673;
len1[22] = 858879;
pos1[23] = 11966559;
len1[23] = 170338;
pos1[24] = 5228510;
len1[24] = 331768;
pos1[25] = 12273354;
len1[25] = 711678;
pos1[26] = 13006046;
len1[26] = 865901;
pos1[27] = 8447558;
len1[27] = 519960;
pos1[28] = 12824484;
len1[28] = 10713;
pos1[29] = 3156718;
len1[29] = 55952;
pos1[30] = 9152582;
len1[30] = 1018119;
pos1[31] = 365742;
len1[31] = 953577;
pos1[32] = 5241993;
len1[32] = 706563;
pos1[33] = 32648;
len1[33] = 791071;
pos1[34] = 8868714;
len1[34] = 983844;
pos1[35] = 2129328;
len1[35] = 658494;
pos1[36] = 246918;
len1[36] = 101502;
pos1[37] = 6577103;
len1[37] = 266030;
pos1[38] = 12713793;
len1[38] = 814882;
pos1[39] = 7880614;
len1[39] = 107285;
pos1[40] = 12539585;
len1[40] = 898134;
pos1[41] = 1772169;
len1[41] = 145642;
pos1[42] = 1857848;
len1[42] = 971131;
pos1[43] = 6954824;
len1[43] = 620271;
pos1[44] = 6379690;
len1[44] = 231407;
pos1[45] = 11807777;
len1[45] = 576393;
pos1[46] = 7501047;
len1[46] = 291417;
pos1[47] = 5570240;
len1[47] = 763674;
pos1[48] = 15955814;
len1[48] = 1591;
pos1[49] = 7792962;
len1[49] = 967045;
pos1[50] = 2646688;
len1[50] = 277177;
pos1[51] = 12045117;
len1[51] = 820311;
pos1[52] = 8775100;
len1[52] = 723454;
pos1[53] = 13038687;
len1[53] = 699561;
pos1[54] = 1578657;
len1[54] = 591083;
pos1[55] = 3419389;
len1[55] = 778550;
for (LONG i = 0; i < count; i++)
{
ULONG len = len1[i];
KBuffer::SPtr buffer;
PUCHAR b;
AllocBuffer(len, buffer, b);
//
// Pick a random place but do not go beyond end of file
//
LONGLONG pos = pos1[i];
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer, read, 0, len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(len == read);
ValidateDataBuffer(*buffer, read, 0, pos);
}
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
logicalLog = nullptr;
}
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 6b: Specific read pattern");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
for (LONG i = 0; i < 4; i++)
{
ULONG len = 996198;
KBuffer::SPtr buffer;
PUCHAR b;
AllocBuffer(len, buffer, b);
//
// Pick a random place but do not go beyond end of file
//
LONGLONG pos = 15117795;
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer, read, 0, len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(len == read);
ValidateDataBuffer(*buffer, read, 0, pos);
}
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 7: Random access reads of more than a prefetch record size and verify that data is correct
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 7: Random access reads of more than a prefetch record size and verify that data is correct");
Common::Random random((int)GetTickCount64());
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
// TODO: replace 32 with 1024 when Rich fixes
for (LONG i = 0; i < 32; i++)
{
ULONG len = random.Next() % (3 * prefetchSize);
KBuffer::SPtr buffer;
PUCHAR b;
AllocBuffer(len, buffer, b);
//
// Pick a random place but do not go beyond end of file
//
LONGLONG r = random.Next();
LONGLONG m = (dataSize - (prefetchSize + 4096 + len));
LONGLONG pos = (r % m);
pos = pos + (prefetchSize + 4096);
// TODO: Remove this when I feel good about the math
CODING_ERROR_ASSERT(pos >= (prefetchSize + 4096));
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*buffer, read, 0, len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(len == read);
ValidateDataBuffer(*buffer, read, 0, pos);
}
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 8: Read section of file and then truncate head in the middle of the read buffer. Ensure reading truncated
// space fails and untruncated succeeds
//
//
// Test 9: Read section of file and then truncate head in the middle of the prefetch read buffer. Ensure reading truncated
// space fails and untruncated succeeds
//
{
LONGLONG pos;
ULONG read;
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 9: Read section of file and then truncate head in the middle of the prefetch read buffer. Ensure reading truncated space fails and untruncated succeeds");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
//
// Perform gratuitous read to ensure read stream is
// initialized
//
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
//
// truncate head in the middle of the prefetch buffer
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Truncate head at {0}", prefetchSize + 4096);
status = SyncAwait(logicalLog->TruncateHead(prefetchSize + 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
#if 0 // Test only for KtlLogger
//
// reads should not work
//
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 0);
pos = 4096;
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 0);
pos = prefetchSize + 256;
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 0);
#endif
//
// read in untruncated space
//
pos = prefetchSize + 4096 + 512;
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*bufferBigK, read, 0, pos);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 10: Read section of file and then truncate tail in the middle of the read buffer. Ensure reading truncated
// space fails and untruncated succeeds
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 10: Read section of file and then truncate tail in the middle of the read buffer. Ensure reading truncated space fails and untruncated succeeds");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
//
// Test 10a: Fill read buffer and truncate tail ahead of it. Verify that only data before truncate tail
// is read and is correct
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 10a: Fill read buffer and truncate tail ahead of it. Verify that only data before truncate tail is read and is correct");
LONGLONG pos = dataSize - (16384 + 4096);
LONGLONG tailTruncatePos = pos + (8192 + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 16384));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 8192);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
//
// Test 10b: Fill read buffer and truncate before current read pointer. Verify no data read
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 10b: Fill read buffer and truncate before current read pointer. Verify no data read");
pos = dataSize - (16384 + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 4096);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
tailTruncatePos = pos - 128;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 0);
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
//
// Test 10c: Fill read buffer and overwrite it with different data. Continue reading and verify that
// new data is read
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 10c: Fill read buffer and overwrite it with different data. Continue reading and verify that new data is read");
pos = dataSize - (16384 + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 4096);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
tailTruncatePos = pos - 128;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
LONGLONG fillSize = dataSize - tailTruncatePos;
KBuffer::SPtr fillBuffer;
PUCHAR b;
AllocBuffer(static_cast<LONG>(fillSize), fillBuffer, b);
BuildDataBuffer(*fillBuffer, tailTruncatePos, 5);
status = SyncAwait(logicalLog->AppendAsync(*fillBuffer, 0, (LONG)fillSize, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(logicalLog->FlushWithMarkerAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 256));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 256);
ValidateDataBuffer(*bufferBigK, read, 0, pos, 5);
pos += read;
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 11: Read section of file and then truncate tail in the middle of the prefetch read buffer. Ensure reading truncated
// space fails and untruncated succeeds
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 11: Read section of file and then truncate tail in the middle of the prefetch read buffer. Ensure reading truncated space fails and untruncated succeeds");
ILogicalLogReadStream::SPtr stream;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
//
// Test 11a: Fill read buffer and truncate tail ahead of it. Verify that only data before truncate tail
// is read and is correct
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 11a: Fill read buffer and truncate tail ahead of it. Verify that only data before truncate tail is read and is correct");
LONGLONG pos = dataSize - (prefetchSize + 16384 + 4096);
LONGLONG tailTruncatePos = pos + (prefetchSize + 8192 + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
ULONG read;
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 16384 + prefetchSize));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == static_cast<ULONG>(8192 + prefetchSize));
ValidateDataBuffer(*bufferBigK, read, 0, pos);
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
//
// Test 11c: Fill read buffer and overwrite it with different data. Continue reading and verify that
// new data is read
//
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 11c: Fill read buffer and overwrite it with different data. Continue reading and verify that new data is read");
pos = dataSize - (prefetchSize + 16384 + 4096);
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 4096);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
tailTruncatePos = (pos + prefetchSize) - 128;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
LONGLONG fillSize = dataSize - tailTruncatePos;
KBuffer::SPtr fillBuffer;
PUCHAR b;
AllocBuffer(static_cast<ULONG>(fillSize), fillBuffer, b);
BuildDataBuffer(*fillBuffer, tailTruncatePos, 5);
status = SyncAwait(logicalLog->AppendAsync(*fillBuffer, 0, (LONG)fillSize, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(logicalLog->FlushWithMarkerAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
LONGLONG read1Len = tailTruncatePos - pos;
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, (LONG)read1Len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == (ULONG)read1Len);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
LONGLONG read2Len = dataSize - tailTruncatePos;
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, (LONG)read2Len));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == (ULONG)read2Len);
ValidateDataBuffer(*bufferBigK, read, 0, pos, 5);
pos += read;
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 12: Write at the end of the file within the read buffer and verify that read succeeds with correct data
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 12: Write at the end of the file within the read buffer and verify that read succeeds with correct data");
ILogicalLogReadStream::SPtr stream;
ULONG read;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_STATUS_SUCCESS("LogicalLog::CreateReadStream", status);
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
//
// Perform gratuitous read to ensure read stream is
// initialized
//
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
//
// Truncate tail to make space at the end of the file and then read at the end of the file to fill
// read buffer. Write at end of file and then continue to read the new data.
//
LONGLONG pos = dataSize - (16384 + 4096);
LONGLONG tailTruncatePos = pos + 4096;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 2048));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 2048);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
LONGLONG fillSize = dataSize - tailTruncatePos;
KBuffer::SPtr fillBuffer;
PUCHAR b;
AllocBuffer(static_cast<ULONG>(fillSize + 1), fillBuffer, b);
BuildDataBuffer(*fillBuffer, tailTruncatePos, 7);
status = SyncAwait(logicalLog->AppendAsync(*fillBuffer, 0, (LONG)fillSize, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(logicalLog->FlushWithMarkerAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 4096));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 4096);
ValidateDataBuffer(*bufferBigK, 2048, 0, pos);
ValidateDataBuffer(*bufferBigK, read - 2048, 2048, pos + 2048, 7);
pos += read;
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
}
//
// Test 13: Write at the end of the file within the prefetch buffer and verify that read succeeds with correct data
//
{
TestCommon::TestSession::WriteInfo(TraceComponent, "Test 13: Write at the end of the file within the prefetch buffer and verify that read succeeds with correct data");
ILogicalLogReadStream::SPtr stream;
ULONG read;
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
status = logicalLog->CreateReadStream(stream, prefetchSize);
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(stream->GetLength() == logicalLog->GetLength());
//
// Perform gratuitous read to ensure read stream is
// initialized
//
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, chunkSize));
//
// Truncate tail to make space at the end of the file and then read at the end of the file to fill
// read buffer. Write at end of file and then continue to read the new data.
//
LONGLONG pos = dataSize - (16384 + 4096 + prefetchSize);
LONGLONG tailTruncatePos = pos + 4096 + prefetchSize;
status = SyncAwait(logicalLog->TruncateTail(tailTruncatePos, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
stream->SetPosition(pos/*, SeekOrigin.Begin*/);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, 2048));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == 2048);
ValidateDataBuffer(*bufferBigK, read, 0, pos);
pos += read;
LONGLONG fillSize = dataSize - tailTruncatePos;
KBuffer::SPtr fillBuffer;
PUCHAR b;
AllocBuffer(static_cast<ULONG>(fillSize), fillBuffer, b);
BuildDataBuffer(*fillBuffer, tailTruncatePos, 9);
status = SyncAwait(logicalLog->AppendAsync(*fillBuffer, 0, (LONG)fillSize, CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
status = SyncAwait(logicalLog->FlushWithMarkerAsync(CancellationToken::None));
VERIFY_IS_TRUE(NT_SUCCESS(status));
ULONG readExpected = (16384 + 2048 + prefetchSize);
status = SyncAwait(stream->ReadAsync(*bufferBigK, read, 0, readExpected));
VERIFY_IS_TRUE(NT_SUCCESS(status));
VERIFY_IS_TRUE(read == readExpected);
LONG firstRead = prefetchSize + 2048;
ValidateDataBuffer(*bufferBigK, firstRead, 0, pos);
ValidateDataBuffer(*bufferBigK, readExpected - firstRead, firstRead, pos + firstRead, 9);
pos += read;
//
// Overwrite end of file to bring back to original state
//
RestoreEOFState(*logicalLog, dataSize, prefetchSize);
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
status = SyncAwait(physicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("PhysicalLog::CloseAsync", status);
physicalLog = nullptr;
//* Cleanup
status = SyncAwait(logManager->DeletePhysicalLogAsync(*physicalLogName, physicalLogId, CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogManager::DeletePhysicalLogAsync", status);
status = SyncAwait(logManager->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogManager::CloseAsync", status);
logManager = nullptr;
}
}