in src/prod/src/data/logicallog/LogTestBase.cpp [197:658]
VOID LogTestBase::OpenManyStreamsTest(__in KtlLoggerMode)
{
NTSTATUS status;
ILogManagerHandle::SPtr logManager;
status = CreateAndOpenLogManager(logManager);
VERIFY_STATUS_SUCCESS("CreateAndOpenLogManager", status);
// Don't care if this fails
SyncAwait(logManager->DeletePhysicalLogAsync(CancellationToken::None));
IPhysicalLogHandle::SPtr physicalLog;
status = CreateDefaultPhysicalLog(*logManager, physicalLog);
VERIFY_STATUS_SUCCESS("CreatePhysicalLog", status);
KString::SPtr logicalLogName;
GenerateUniqueFilename(logicalLogName);
KGuid logicalLogId;
logicalLogId.CreateNew();
ILogicalLog::SPtr logicalLog;
status = CreateLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("CreateLogicalLog", status);
KEvent taskComplete(FALSE, FALSE); // auto reset
// The original bug causes these tests to deadlock, so post them on the
// threadpool to check for failure (rather than waiting for test timeout)
// Test creating and immediately closing streams
{
Common::Threadpool::Post([&]()
{
for (int i = 0; i < NumStreams; i++)
{
ILogicalLogReadStream::SPtr readStream;
status = logicalLog->CreateReadStream(readStream, 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
status = SyncAwait(readStream->CloseAsync());
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
readStream = nullptr;
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyClose", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test creating and immediately disposing streams
{
Common::Threadpool::Post([&]()
{
for (int i = 0; i < NumStreams; i++)
{
ILogicalLogReadStream::SPtr readStream;
status = logicalLog->CreateReadStream(readStream, 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
readStream->Dispose();
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
readStream = nullptr;
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyDispose", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test creating and immediately destructing streams
{
Common::Threadpool::Post([&]()
{
for (int i = 0; i < NumStreams; i++)
{
ILogicalLogReadStream::SPtr readStream;
status = logicalLog->CreateReadStream(readStream, 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
readStream = nullptr;
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyDestruct", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test creating all streams then closing
{
// Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
Common::Threadpool::Post([&]()
{
ILogicalLogReadStream::SPtr streams[NumStreams];
for (int i = 0; i < NumStreams; i++)
{
status = logicalLog->CreateReadStream(streams[i], 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
}
for (int i = 0; i < NumStreams; i++)
{
status = SyncAwait(streams[i]->CloseAsync());
streams[i] = nullptr;
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenClose", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test creating all streams then disposing
{
// Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
Common::Threadpool::Post([&]()
{
ILogicalLogReadStream::SPtr streams[NumStreams];
for (int i = 0; i < NumStreams; i++)
{
status = logicalLog->CreateReadStream(streams[i], 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
}
for (int i = 0; i < NumStreams; i++)
{
streams[i]->Dispose();
streams[i] = nullptr;
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenDispose", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test creating all streams then destructing
{
// Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
Common::Threadpool::Post([&]()
{
ILogicalLogReadStream::SPtr streams[NumStreams];
for (int i = 0; i < NumStreams; i++)
{
status = logicalLog->CreateReadStream(streams[i], 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
}
for (int i = 0; i < NumStreams; i++)
{
streams[i] = nullptr;
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenDestruct", status);
}
// Close and reopen the log
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);
// Test with random choice of when to cleanup stream
{
// Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
Common::Threadpool::Post([&]()
{
Common::Random random(NumStreams);
ILogicalLogReadStream::SPtr streams[NumStreams];
for (int i = 0; i < NumStreams; i++)
{
status = logicalLog->CreateReadStream(streams[i], 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
switch (random.Next() % 6)
{
case 0:
case 1:
case 2:
// let it be cleaned up later
break;
case 3:
// Close now
status = SyncAwait(streams[i]->CloseAsync());
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
streams[i] = nullptr;
break;
case 4:
// Dispose now
streams[i]->Dispose();
streams[i] = nullptr;
break;
case 5:
// Destruct now
streams[i] = nullptr;
break;
}
}
for (int i = 0; i < NumStreams; i++)
{
if (streams[i] != nullptr)
{
switch (random.Next() % 3)
{
case 0:
// Close
status = SyncAwait(streams[i]->CloseAsync());
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
streams[i] = nullptr;
break;
case 1:
// Dispose
streams[i]->Dispose();
streams[i] = nullptr;
break;
case 2:
// Destruct
streams[i] = nullptr;
break;
}
}
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_RandomCleanup", status);
}
// Do the random cleanup test again without reopening the log
{
// Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
Common::Threadpool::Post([&]()
{
Common::Random random(NumStreams);
ILogicalLogReadStream::SPtr streams[NumStreams];
for (int i = 0; i < NumStreams; i++)
{
status = logicalLog->CreateReadStream(streams[i], 0);
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
switch (random.Next() % 6)
{
case 0:
case 1:
case 2:
// let it be cleaned up later
break;
case 3:
// Close now
status = SyncAwait(streams[i]->CloseAsync());
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
streams[i] = nullptr;
break;
case 4:
// Dispose now
streams[i]->Dispose();
streams[i] = nullptr;
break;
case 5:
// Destruct now
streams[i] = nullptr;
break;
}
}
for (int i = 0; i < NumStreams; i++)
{
if (streams[i] != nullptr)
{
switch (random.Next() % 3)
{
case 0:
// Close
status = SyncAwait(streams[i]->CloseAsync());
if (!NT_SUCCESS(status))
{
taskComplete.SetEvent();
return;
}
streams[i] = nullptr;
break;
case 1:
// Dispose
streams[i]->Dispose();
streams[i] = nullptr;
break;
case 2:
// Destruct
streams[i] = nullptr;
break;
}
}
}
status = STATUS_SUCCESS;
taskComplete.SetEvent();
});
bool completed = taskComplete.WaitUntilSet(15000);
VERIFY_IS_TRUE(completed); // Likely deadlocked if false
VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_RandomCleanup", status);
}
// Cleanup
status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
logicalLog = nullptr;
status = SyncAwait(physicalLog->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("PhysicalLog::CloseAsync", status);
physicalLog = nullptr;
status = SyncAwait(logManager->DeletePhysicalLogAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogManager::DeletePhysicalLogAsync", status);
status = SyncAwait(logManager->CloseAsync(CancellationToken::None));
VERIFY_STATUS_SUCCESS("LogManager::CloseAsync", status);
logManager = nullptr;
}