VOID LogTestBase::OpenManyStreamsTest()

in src/prod/src/data/logicallog/LogTestBase.cpp [197:658]


    VOID LogTestBase::OpenManyStreamsTest(__in KtlLoggerMode)
    {
        NTSTATUS status;

        ILogManagerHandle::SPtr logManager;
        status = CreateAndOpenLogManager(logManager);
        VERIFY_STATUS_SUCCESS("CreateAndOpenLogManager", status);

        // Don't care if this fails
        SyncAwait(logManager->DeletePhysicalLogAsync(CancellationToken::None));

        IPhysicalLogHandle::SPtr physicalLog;
        status = CreateDefaultPhysicalLog(*logManager, physicalLog);
        VERIFY_STATUS_SUCCESS("CreatePhysicalLog", status);

        KString::SPtr logicalLogName;
        GenerateUniqueFilename(logicalLogName);

        KGuid logicalLogId;
        logicalLogId.CreateNew();

        ILogicalLog::SPtr logicalLog;
        status = CreateLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("CreateLogicalLog", status);

        KEvent taskComplete(FALSE, FALSE);  // auto reset

        // The original bug causes these tests to deadlock, so post them on the 
        // threadpool to check for failure (rather than waiting for test timeout)

        // Test creating and immediately closing streams
        {
            Common::Threadpool::Post([&]()
            {
                for (int i = 0; i < NumStreams; i++)
                {
                    ILogicalLogReadStream::SPtr readStream;

                    status = logicalLog->CreateReadStream(readStream, 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    status = SyncAwait(readStream->CloseAsync());
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    readStream = nullptr;
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyClose", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test creating and immediately disposing streams
        {
            Common::Threadpool::Post([&]()
            {
                for (int i = 0; i < NumStreams; i++)
                {
                    ILogicalLogReadStream::SPtr readStream;

                    status = logicalLog->CreateReadStream(readStream, 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    readStream->Dispose();
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    readStream = nullptr;
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyDispose", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test creating and immediately destructing streams
        {
            Common::Threadpool::Post([&]()
            {
                for (int i = 0; i < NumStreams; i++)
                {
                    ILogicalLogReadStream::SPtr readStream;

                    status = logicalLog->CreateReadStream(readStream, 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    readStream = nullptr;
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAndImmediatelyDestruct", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test creating all streams then closing
        {
            // Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
            Common::Threadpool::Post([&]()
            {
                ILogicalLogReadStream::SPtr streams[NumStreams];

                for (int i = 0; i < NumStreams; i++)
                {
                    status = logicalLog->CreateReadStream(streams[i], 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }
                }

                for (int i = 0; i < NumStreams; i++)
                {
                    status = SyncAwait(streams[i]->CloseAsync());
                    streams[i] = nullptr;

                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenClose", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test creating all streams then disposing
        {
            // Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
            Common::Threadpool::Post([&]()
            {
                ILogicalLogReadStream::SPtr streams[NumStreams];

                for (int i = 0; i < NumStreams; i++)
                {
                    status = logicalLog->CreateReadStream(streams[i], 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }
                }

                for (int i = 0; i < NumStreams; i++)
                {
                    streams[i]->Dispose();
                    streams[i] = nullptr;
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenDispose", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test creating all streams then destructing
        {
            // Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
            Common::Threadpool::Post([&]()
            {
                ILogicalLogReadStream::SPtr streams[NumStreams];

                for (int i = 0; i < NumStreams; i++)
                {
                    status = logicalLog->CreateReadStream(streams[i], 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }
                }

                for (int i = 0; i < NumStreams; i++)
                {
                    streams[i] = nullptr;
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_OpenAllThenDestruct", status);
        }

        // Close and reopen the log
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);

        status = OpenLogicalLog(*physicalLog, logicalLogId, *logicalLogName, logicalLog);
        VERIFY_STATUS_SUCCESS("OpenLogicalLog", status);

        // Test with random choice of when to cleanup stream
        {
            // Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
            Common::Threadpool::Post([&]()
            {
                Common::Random random(NumStreams);
                ILogicalLogReadStream::SPtr streams[NumStreams];

                for (int i = 0; i < NumStreams; i++)
                {
                    status = logicalLog->CreateReadStream(streams[i], 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    switch (random.Next() % 6)
                    {
                    case 0:
                    case 1:
                    case 2:
                        // let it be cleaned up later
                        break;

                    case 3:
                        // Close now
                        status = SyncAwait(streams[i]->CloseAsync());
                        if (!NT_SUCCESS(status))
                        {
                            taskComplete.SetEvent();
                            return;
                        }
                        streams[i] = nullptr;
                        break;

                    case 4:
                        // Dispose now
                        streams[i]->Dispose();
                        streams[i] = nullptr;
                        break;

                    case 5:
                        // Destruct now
                        streams[i] = nullptr;
                        break;
                    }
                }

                for (int i = 0; i < NumStreams; i++)
                {
                    if (streams[i] != nullptr)
                    {
                        switch (random.Next() % 3)
                        {
                        case 0:
                            // Close
                            status = SyncAwait(streams[i]->CloseAsync());
                            if (!NT_SUCCESS(status))
                            {
                                taskComplete.SetEvent();
                                return;
                            }
                            streams[i] = nullptr;
                            break;

                        case 1:
                            // Dispose
                            streams[i]->Dispose();
                            streams[i] = nullptr;
                            break;

                        case 2:
                            // Destruct
                            streams[i] = nullptr;
                            break;
                        }
                    }
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_RandomCleanup", status);
        }

        // Do the random cleanup test again without reopening the log
        {
            // Note: this actually passes with the original bug, as only the streams array 'append' codepath is exercised
            Common::Threadpool::Post([&]()
            {
                Common::Random random(NumStreams);
                ILogicalLogReadStream::SPtr streams[NumStreams];

                for (int i = 0; i < NumStreams; i++)
                {
                    status = logicalLog->CreateReadStream(streams[i], 0);
                    if (!NT_SUCCESS(status))
                    {
                        taskComplete.SetEvent();
                        return;
                    }

                    switch (random.Next() % 6)
                    {
                    case 0:
                    case 1:
                    case 2:
                        // let it be cleaned up later
                        break;

                    case 3:
                        // Close now
                        status = SyncAwait(streams[i]->CloseAsync());
                        if (!NT_SUCCESS(status))
                        {
                            taskComplete.SetEvent();
                            return;
                        }
                        streams[i] = nullptr;
                        break;

                    case 4:
                        // Dispose now
                        streams[i]->Dispose();
                        streams[i] = nullptr;
                        break;

                    case 5:
                        // Destruct now
                        streams[i] = nullptr;
                        break;
                    }
                }

                for (int i = 0; i < NumStreams; i++)
                {
                    if (streams[i] != nullptr)
                    {
                        switch (random.Next() % 3)
                        {
                        case 0:
                            // Close
                            status = SyncAwait(streams[i]->CloseAsync());
                            if (!NT_SUCCESS(status))
                            {
                                taskComplete.SetEvent();
                                return;
                            }
                            streams[i] = nullptr;
                            break;

                        case 1:
                            // Dispose
                            streams[i]->Dispose();
                            streams[i] = nullptr;
                            break;

                        case 2:
                            // Destruct
                            streams[i] = nullptr;
                            break;
                        }
                    }
                }

                status = STATUS_SUCCESS;
                taskComplete.SetEvent();
            });

            bool completed = taskComplete.WaitUntilSet(15000);
            VERIFY_IS_TRUE(completed); // Likely deadlocked if false
            VERIFY_STATUS_SUCCESS("OpenManyStreamsTest_RandomCleanup", status);
        }

        // Cleanup
        status = SyncAwait(logicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogicalLog::CloseAsync", status);
        logicalLog = nullptr;

        status = SyncAwait(physicalLog->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("PhysicalLog::CloseAsync", status);
        physicalLog = nullptr;

        status = SyncAwait(logManager->DeletePhysicalLogAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogManager::DeletePhysicalLogAsync", status);

        status = SyncAwait(logManager->CloseAsync(CancellationToken::None));
        VERIFY_STATUS_SUCCESS("LogManager::CloseAsync", status);
        logManager = nullptr;
    }