void KtlLoggerNode::OpenAsyncOperation::OnStart()

in src/prod/src/ktllogger/KtlLoggerNode.cpp [78:538]


void KtlLoggerNode::OpenAsyncOperation::OnStart(AsyncOperationSPtr const & thisSPtr)
{
    NTSTATUS status;
    KtlLoggerInitializationContext::SPtr ktlLoggerInitContext;
    BOOL b;
    
#if !defined(PLATFORM_UNIX)          // TODO: Determine size of memory
    MEMORYSTATUSEX memoryStatusEx;

    memoryStatusEx.dwLength = sizeof(memoryStatusEx);
    b = GlobalMemoryStatusEx(&memoryStatusEx);
    if (!b)
    {
        Common::ErrorCode error = Common::ErrorCode::FromWin32Error(GetLastError());
        WriteError("KtlLoggerNode", "Line: {0}, Error {1}", __LINE__, error);
        TryComplete(thisSPtr, error);
        return;
    }
#endif

    if (!owner_.useGlobalKtlSystem_)
    {
        owner_.ktlSystem_->Activate(static_cast<ULONG>(timeoutHelper_.GetRemainingTime().TotalPositiveMilliseconds()));
        status = owner_.ktlSystem_->Status();
        if (! NT_SUCCESS(status))
        {
            WriteError("KtlLoggerNode", "Line: {0}, Status {1}", __LINE__, status);
            TryComplete(thisSPtr, ErrorCode::FromNtStatus(status));
            return;
        }

        // NOTE: This is needed for high performance of v2 replicator stack as more threads 
        // are used to perform completions of awaitables
        owner_.ktlSystem_->SetDefaultSystemThreadPoolUsage(FALSE);
    }

#if defined(UDRIVER)
    //
    // For UDRIVER, need to perform work done in PNP Device Add
    //
    status = FileObjectTable::CreateAndRegisterOverlayManager(owner_.GetAllocator(), KTL_TAG_TEST);
    if (!NT_SUCCESS(status))
    {
        Common::ErrorCode error = Common::ErrorCode::FromNtStatus(status);

        TryComplete(thisSPtr, error);
        return;
    }

    owner_.isRegistered_ = true;
#else
    // For kernel, we assume driver already installed by InstallForCITs
#endif  

    status = KtlLoggerInitializationContext::Create(
        owner_.GetAllocator(),
        KTL_TAG_TEST,
        &owner_,
        ktlLoggerInitContext);

    if (!NT_SUCCESS(status))
    {
        Common::ErrorCode error = Common::ErrorCode::FromNtStatus(status);

        WriteError("KtlLoggerNode", "Line: {0}, Status {1}", __LINE__, status);
        TryComplete(thisSPtr, error);
        return;
    }

    //
    // Configure SharedLogSettings
    //
    BOOLEAN automaticMemoryConfiguration;
    KtlLoggerConfig& config = KtlLoggerConfig::GetConfig();
    ServiceModel::ServiceModelConfig& smConfig = ServiceModel::ServiceModelConfig::GetConfig();
    KtlLogManager::MemoryThrottleLimits memoryLimits;
    KtlLogManager::AcceleratedFlushLimits accelerateFlushLimits;

    static const LONGLONG oneKB = 1024;
    static const LONG oneKBL = 1024;
    LONGLONG value;
    LONG valueL;

    applicationSharedLogSettings_ = make_unique<KtlLogManager::SharedLogContainerSettings>();
    systemServicesSharedLogSettings_ = make_unique<KtlLogManager::SharedLogContainerSettings>();

    const LONGLONG oneMB = 1024 * 1024;
    ULONG valueU;
    bool sharedLogIdOk = true;
    Guid g;

    std::wstring sharedLogPath;
    std::wstring sharedLogId;
    LONGLONG sharedLogSizeInMB;

    //
    // NodeType settings take precidence over global settings
    //
    sharedLogPath = fabricNodeConfig_->SharedLogFilePath;
    if (sharedLogPath == L"")
    {
        sharedLogPath = smConfig.SharedLogPath;
    }

    sharedLogId = fabricNodeConfig_->SharedLogFileId;
    if (sharedLogId == L"")
    {
        sharedLogId = smConfig.SharedLogId;
    }

    sharedLogSizeInMB = fabricNodeConfig_->SharedLogFileSizeInMB;
    if (sharedLogSizeInMB == 0)
    {
        sharedLogSizeInMB = smConfig.SharedLogSizeInMB;
    }

    if (sharedLogId != L"")
    {
        HRESULT hr;
        LPCWSTR wstr = sharedLogId.c_str();
        size_t len;
        hr = StringCchLength(wstr, MAX_PATH, &len);
        if (SUCCEEDED(hr))
        {
            //
            // get rid of { and } if present
            //
            if (wstr[len - 1] == L'}')
            {
                sharedLogId.erase(len - 1, 1);
            }

            if (wstr[0] == L'{')
            {
                sharedLogId.erase(0, 1);
            }

            sharedLogIdOk = Guid::TryParse(sharedLogId, g);
        }
        else {
            // todo?: debug assert
            sharedLogIdOk = false;
        }

        if (!sharedLogIdOk)
        {
            // todo?: debug assert
            sharedLogId = Constants::NullGuidString;
        }
    }
    else {
        sharedLogId = Constants::NullGuidString;
    }
        
    Guid::TryParse(sharedLogId, g);

    if (!sharedLogIdOk)
    {
        sharedLogPath = L"";
    }

    if (sharedLogId == L"" || sharedLogId == Constants::NullGuidString)
    {
        g = Constants::DefaultApplicationSharedLogId;
    }

    if (sharedLogPath == L"")
    {
#if !defined(PLATFORM_UNIX) 
        Path::CombineInPlace(sharedLogPath, fabricDataRoot_);
#else
        // In Linux, applications will use a logger daemon, so it's okay for
        // multiple application hosts on a node to share the same shared log.
        //
        Path::CombineInPlace(sharedLogPath, fabricNodeConfig_->WorkingDir);
#endif
        Path::CombineInPlace(sharedLogPath, Constants::DefaultApplicationSharedLogSubdirectory);
        Path::CombineInPlace(sharedLogPath, Constants::DefaultApplicationSharedLogName);
    }

#if !defined(PLATFORM_UNIX) 
    if (sharedLogPath.find(KtlLoggerSharedLogSettings::WindowsPathPrefix) != 0)
    {
        sharedLogPath = wformatString("{0}{1}", *KtlLoggerSharedLogSettings::WindowsPathPrefix, sharedLogPath);
    }
#endif

    StringCchCopyW(applicationSharedLogSettings_->Path, (sizeof(applicationSharedLogSettings_->Path) / 2), sharedLogPath.c_str());

    ::GUID const& gg = g.AsGUID();
    KGuid kg = gg;
    applicationSharedLogSettings_->LogContainerId = kg;

    value = sharedLogSizeInMB;
    applicationSharedLogSettings_->LogSize = value * oneMB;

    valueL = smConfig.SharedLogNumberStreams;
    applicationSharedLogSettings_->MaximumNumberStreams = valueL;

    valueL = smConfig.SharedLogMaximumRecordSizeInKB;
    applicationSharedLogSettings_->MaximumRecordSize = (ULONG)(valueL * oneKBL);

    valueU = smConfig.SharedLogCreateFlags;
    applicationSharedLogSettings_->Flags = valueU;

    static const KGuid guidNull(0x00000000, 0x0000, 0x0000, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00);
    applicationSharedLogSettings_->DiskId = guidNull;

    {
        auto systemServicesSharedLogPath = KtlLoggerConfig::GetConfig().SystemSharedLogPath;

        if (systemServicesSharedLogPath.empty())
        {
#if !defined(PLATFORM_UNIX) 
            Path::CombineInPlace(systemServicesSharedLogPath, fabricDataRoot_);
#else
            // In Linux, system services will use an in-proc logger that has the restriction
            // that each process must have it's own shared log, so the shared log must be located
            // under the node working directory. More specifically, one in-proc logger per KTL
            // system.
            //
            // In Windows, there is a single global logger at the kernel level.
            //
            Path::CombineInPlace(systemServicesSharedLogPath, fabricNodeConfig_->WorkingDir);
#endif
            Path::CombineInPlace(systemServicesSharedLogPath, Constants::DefaultSystemServicesSharedLogSubdirectory);
            Path::CombineInPlace(systemServicesSharedLogPath, Constants::DefaultSystemServicesSharedLogName);

#if !defined(PLATFORM_UNIX) 
            if (systemServicesSharedLogPath.find(KtlLoggerSharedLogSettings::WindowsPathPrefix) != 0)
            {
                systemServicesSharedLogPath = wformatString("{0}{1}", KtlLoggerSharedLogSettings::WindowsPathPrefix, systemServicesSharedLogPath);
            }
#endif

            StringCchCopyW(systemServicesSharedLogSettings_->Path, (sizeof(systemServicesSharedLogSettings_->Path) / 2), systemServicesSharedLogPath.c_str());
        }

        // pick a default log container id
        if (KtlLoggerConfig::GetConfig().EnableHashSystemSharedLogIdFromPath)
        {
            auto guid = Guid::Test_FromStringHashCode(systemServicesSharedLogSettings_->Path);
            systemServicesSharedLogSettings_->LogContainerId = guid.ToKGuid();
        }
        else
        {
            KGuid ssKg = Constants::DefaultSystemServicesSharedLogId.AsGUID();
            systemServicesSharedLogSettings_->LogContainerId = ssKg;
        }

        // Grab all the various settings from existing configuration/defaults.
        systemServicesSharedLogSettings_->LogSize = KtlLoggerConfig::GetConfig().SystemSharedLogSizeInMB * oneMB;
        systemServicesSharedLogSettings_->MaximumNumberStreams = KtlLoggerConfig::GetConfig().SystemSharedLogNumberStreams;
        systemServicesSharedLogSettings_->MaximumRecordSize = KtlLoggerConfig::GetConfig().SystemSharedLogMaximumRecordSizeInKB * oneKB;
        systemServicesSharedLogSettings_->Flags = KtlLoggerConfig::GetConfig().SystemSharedLogCreateFlags;
        systemServicesSharedLogSettings_->DiskId = applicationSharedLogSettings_->DiskId;
    }

    //
    // Configure memory limits
    //
    value = config.AutomaticMemoryConfiguration;
    if (value == 0)
    {
        automaticMemoryConfiguration = FALSE;
    }
    else
    {
        automaticMemoryConfiguration = TRUE;
    }

#if !defined(PLATFORM_UNIX)
    if (automaticMemoryConfiguration)
    {
        LONGLONG memorySizeInBytes = memoryStatusEx.ullTotalPhys;

        memoryLimits.WriteBufferMemoryPoolMin = memorySizeInBytes / 8;
        if (memoryLimits.WriteBufferMemoryPoolMin < KtlLogManager::MemoryThrottleLimits::_DefaultWriteBufferMemoryPoolMinMin)
        {
            memoryLimits.WriteBufferMemoryPoolMin = KtlLogManager::MemoryThrottleLimits::_DefaultWriteBufferMemoryPoolMinMin;
        }

        memoryLimits.WriteBufferMemoryPoolMax = memorySizeInBytes / 6;
        if (memoryLimits.WriteBufferMemoryPoolMax < memoryLimits.WriteBufferMemoryPoolMin)
        {
            memoryLimits.WriteBufferMemoryPoolMax = memoryLimits.WriteBufferMemoryPoolMin;
        }       

        memoryLimits.WriteBufferMemoryPoolPerStream = oneKBL * oneKBL;

        //
        // Do not allow the WriteBufferMemoryPool to grow to a point
        // where it is larger than the shared log space. In this case
        // the shared log may become full before throttling on the
        // incoming writes occurs; shared log full recovery creates a
        // longer latency than throttling. Note that the usable shared
        // log size is discounted by an eighth to account for padding.
        //
        LONGLONG usablesharedlogsizeinbytes = (sharedLogSizeInMB * oneMB) - (sharedLogSizeInMB * (128 * oneKB));

        if (memoryLimits.WriteBufferMemoryPoolMin > usablesharedlogsizeinbytes)
        {
            memoryLimits.WriteBufferMemoryPoolMin = usablesharedlogsizeinbytes;
        }

        if (memoryLimits.WriteBufferMemoryPoolMax > usablesharedlogsizeinbytes)
        {
            memoryLimits.WriteBufferMemoryPoolMax = usablesharedlogsizeinbytes;
        }
        
        //
        // Allow unlimited pinned memory per node as the kernel will
        // push back when too much memory is in use
        //
        memoryLimits.PinnedMemoryLimit = KtlLogManager::MemoryThrottleLimits::_NoLimit;
    }
    else
#endif
    {
        value = config.WriteBufferMemoryPoolMaximumInKB;
        if (value == 0)
        {
            memoryLimits.WriteBufferMemoryPoolMax = KtlLogManager::MemoryThrottleLimits::_NoLimit;
        }
        else
        {
            memoryLimits.WriteBufferMemoryPoolMax = value * oneKB;
        }

        value = config.WriteBufferMemoryPoolMinimumInKB;
        if (value == 0)
        {
            memoryLimits.WriteBufferMemoryPoolMin = KtlLogManager::MemoryThrottleLimits::_NoLimit;
        }
        else
        {
            memoryLimits.WriteBufferMemoryPoolMin = value * oneKB;
        }

        value = config.PinnedMemoryLimitInKB;
        if (value == 0)
        {
            memoryLimits.PinnedMemoryLimit = KtlLogManager::MemoryThrottleLimits::_NoLimit;
        }
        else
        {
            memoryLimits.PinnedMemoryLimit = value * oneKB;
        }

        valueL = config.WriteBufferMemoryPoolPerStreamInKB;
        if (valueL == -1)
        {
            memoryLimits.WriteBufferMemoryPoolPerStream = oneKBL * oneKBL;
        }
        else
        {
            memoryLimits.WriteBufferMemoryPoolPerStream = valueL * oneKBL;
        }
    }

    value = config.MaximumDestagingWriteOutstandingInKB;
    if (value == 0)
    {
        memoryLimits.MaximumDestagingWriteOutstanding = KtlLogManager::MemoryThrottleLimits::_DefaultMaximumDestagingWriteOutstanding;
    }
    else
    {
        memoryLimits.MaximumDestagingWriteOutstanding = value * oneKB;
    }

    int64 valueI64;

    valueI64 = config.PeriodicFlushTime.TotalSeconds();
    memoryLimits.PeriodicFlushTimeInSec = (ULONG)valueI64;

    valueI64 = config.PeriodicTimerInterval.TotalSeconds();
    memoryLimits.PeriodicTimerIntervalInSec = (ULONG)valueI64;

    valueI64 = config.AllocationTimeout.TotalMilliseconds();
    memoryLimits.AllocationTimeoutInMs = (ULONG)valueI64;

    value = config.SharedLogThrottleLimitInPercentUsed;
    if (value == 0)
    {
        memoryLimits.SharedLogThrottleLimit = KtlLogManager::MemoryThrottleLimits::_DefaultSharedLogThrottleLimit;
    }
    else if (value == 100) {
        memoryLimits.SharedLogThrottleLimit = KtlLogManager::MemoryThrottleLimits::_NoSharedLogThrottleLimit;
    }
    else if ((value < 0) || (value > 100))
    {
        memoryLimits.SharedLogThrottleLimit = KtlLogManager::MemoryThrottleLimits::_DefaultSharedLogThrottleLimit;
    }
    else 
    {
        memoryLimits.SharedLogThrottleLimit = (ULONG)value;
    }


    value = config.AccelerateFlushActiveTimerInMs;
    if ((value != KtlLogManager::AcceleratedFlushLimits::AccelerateFlushActiveTimerInMsNoAction) &&
        ((value < KtlLogManager::AcceleratedFlushLimits::AccelerateFlushActiveTimerInMsMin) ||
         (value > KtlLogManager::AcceleratedFlushLimits::AccelerateFlushActiveTimerInMsMax)))
    {
        accelerateFlushLimits.AccelerateFlushActiveTimerInMs = KtlLogManager::AcceleratedFlushLimits::DefaultAccelerateFlushActiveTimerInMs;
    } else {
        accelerateFlushLimits.AccelerateFlushActiveTimerInMs = (ULONG)value;
    }
    
    value = config.AccelerateFlushPassiveTimerInMs;
    if ((value < KtlLogManager::AcceleratedFlushLimits::AccelerateFlushPassiveTimerInMsMin) ||
        (value > KtlLogManager::AcceleratedFlushLimits::AccelerateFlushPassiveTimerInMsMax))
    {
        accelerateFlushLimits.AccelerateFlushPassiveTimerInMs = KtlLogManager::AcceleratedFlushLimits::DefaultAccelerateFlushPassiveTimerInMs;
    } else {
        accelerateFlushLimits.AccelerateFlushPassiveTimerInMs = (ULONG)value;
    }
    
    value = config.AccelerateFlushActivePercent;
    if ((value < KtlLogManager::AcceleratedFlushLimits::AccelerateFlushActivePercentMin) ||
        (value > KtlLogManager::AcceleratedFlushLimits::AccelerateFlushActivePercentMax))
    {
        accelerateFlushLimits.AccelerateFlushActivePercent = KtlLogManager::AcceleratedFlushLimits::DefaultAccelerateFlushActivePercent;
    } else {
        accelerateFlushLimits.AccelerateFlushActivePercent = (ULONG)value;
    }
    
    value = config.AccelerateFlushPassivePercent;
    if ((value < KtlLogManager::AcceleratedFlushLimits::AccelerateFlushPassivePercentMin) ||
        (value > KtlLogManager::AcceleratedFlushLimits::AccelerateFlushPassivePercentMax))
    {
        accelerateFlushLimits.AccelerateFlushPassivePercent = KtlLogManager::AcceleratedFlushLimits::DefaultAccelerateFlushPassivePercent;
    } else {
        accelerateFlushLimits.AccelerateFlushPassivePercent = (ULONG)value;
    }   
    
    auto operation = shared_ptr<InitializeKtlLoggerAsyncOperation>(
        new(owner_.GetAllocator()) InitializeKtlLoggerAsyncOperation(
            *ktlLoggerInitContext,
#if defined(PLATFORM_UNIX)
            TRUE,
#else
            FALSE,
#endif
            memoryLimits,
            accelerateFlushLimits,
            *applicationSharedLogSettings_,
            fabricDataRoot_,
            [this](AsyncOperationSPtr const& operation)
            {
                this->OnInitializeKtlLoggerCompleted(operation);
            },
            thisSPtr));

    operation->Start(operation);

    if (operation->CompletedSynchronously)
    {
        FinishInitializeKtlLogger(operation);
    }
}