void RequestManager::OnProcessRequest()

in src/prod/src/Management/FileStoreService/RequestManager.cpp [864:1406]


void RequestManager::OnProcessRequest(
    MessageUPtr && request,
    IpcReceiverContextUPtr && receiverContext)
{    
    wstring rejectReason;
    if(!ValidateClientMessage(request, rejectReason))
    {
        WriteWarning(
            TraceComponent,
            TraceId,
            "ValidateClientMessage failed. Reason:{0}, MessageId:{1}",
            rejectReason,
            request->MessageId);

        routingAgentProxy_.OnIpcFailure(ErrorCodeValue::InvalidMessage, *receiverContext);
        return;
    }

    auto timeout = TimeoutHeader::FromMessage(*request).Timeout;
    auto activityId = FabricActivityHeader::FromMessage(*request).ActivityId;
    auto messageId = request->MessageId;
    auto action = request->Action;

    if (!IsChunkAction(action))
    {
        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, ActivityId:{1}, MessageId:{2}, Timeout:{3}, RequestManagerState:{4}",
            action,
            activityId,
            messageId,
            timeout,
            this->GetState());
    }

    if(this->GetState() != RequestManager::Active)
    {
        // Allow read operations from secondary. Reject the rest
        if (action != FileStoreServiceTcpMessage::InternalListAction &&
            action != FileStoreServiceTcpMessage::GetStoreLocationAction &&
            action != FileStoreServiceTcpMessage::GetStoreLocationsAction)
        {
            routingAgentProxy_.OnIpcFailure(ErrorCodeValue::FileStoreServiceNotReady, *receiverContext, activityId);
            return;
        }
    }

    if(action == FileStoreServiceTcpMessage::GetStagingLocationAction)
    {
        ShareLocationReply shareLocationReply(stagingShareLocation_);
        auto replyMessage = move(FileStoreServiceMessage::GetClientOperationSuccess(shareLocationReply, activityId));
        routingAgentProxy_.SendIpcReply(move(replyMessage), *receiverContext);
        return;
    }
    else if(action == FileStoreServiceTcpMessage::GetStoreLocationAction || action == FileStoreServiceTcpMessage::GetStoreLocationsAction)
    {
        ShareLocationReply shareLocationReply(storeShareLocation_);
        auto replyMessage = move(FileStoreServiceMessage::GetClientOperationSuccess(shareLocationReply, activityId));
        routingAgentProxy_.SendIpcReply(move(replyMessage), *receiverContext);
        return;
    }
    else if(action == FileStoreServiceTcpMessage::UploadAction)
    {
        UploadRequest uploadRequest;
        if(!request->GetBody<UploadRequest>(uploadRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            return;
        }

#if defined(PLATFORM_UNIX)
        wstring storeRelativePath = uploadRequest.StoreRelativePath;
        std::replace(storeRelativePath.begin(),storeRelativePath.end(), '\\', '/');
        uploadRequest.StoreRelativePath = storeRelativePath;
#endif
        this->ReplicaObj.FileStoreServiceCounters->OnUploadActionRequest();
        AsyncOperation::CreateAndStart<ProcessUploadRequestAsyncOperation>(
            *this,
            move(uploadRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if (action == FileStoreServiceTcpMessage::CopyAction)
    {
        CopyRequest copyRequest;
        if (!request->GetBody<CopyRequest>(copyRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            return;
        }

        this->ReplicaObj.FileStoreServiceCounters->OnCopyActionRequest();
        AsyncOperation::CreateAndStart<ProcessCopyRequestAsyncOperation>(
            *this,
            move(copyRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if (action == FileStoreServiceTcpMessage::CheckExistenceAction)
    {
        ImageStoreBaseRequest checkExistenceRequest;
        if (!request->GetBody<ImageStoreBaseRequest>(checkExistenceRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing CheckExistenceRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        AsyncOperation::CreateAndStart<ProcessCheckExistenceRequestAsyncOperation>(
            *this,
            move(checkExistenceRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::ListAction)
    {
        ListRequest listRequest;
        if(!request->GetBody<ListRequest>(listRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            return;
        }

        AsyncOperation::CreateAndStart<ProcessListRequestAsyncOperation>(
            *this,
            move(listRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::InternalListAction)
    {
        ListRequest listRequest;
        if(!request->GetBody<ListRequest>(listRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            return;
        }

        this->ReplicaObj.FileStoreServiceCounters->OnInternalListRequest();
        AsyncOperation::CreateAndStart<ProcessInternalListRequestAsyncOperation>(
            *this,
            move(listRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::DeleteAction)
    {
        ImageStoreBaseRequest deleteRequest;
        if(!request->GetBody<ImageStoreBaseRequest>(deleteRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            return;
        }

        this->ReplicaObj.FileStoreServiceCounters->OnDeleteRequest();
        AsyncOperation::CreateAndStart<ProcessDeleteRequestAsyncOperation>(
            *this,
            move(deleteRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessRequestComplete(asyncOperation);
        },
            this->CreateAsyncOperationRoot());
    }
    else if (action == FileStoreServiceTcpMessage::ListUploadSessionAction)
    {
        UploadSessionRequest uploadSessionRequest;
        if (!request->GetBody<UploadSessionRequest>(uploadSessionRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing UploadSessionRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3}  ActivityId:{4} MessageId:{5} State:{6}",
            action,
            uploadSessionRequest.SessionId,
            uploadSessionRequest.StoreRelativePath,
            timeout,
            activityId,
            messageId,
            this->GetState());

        wstring storeRelativePath = uploadSessionRequest.StoreRelativePath;
        Guid sessionId = uploadSessionRequest.SessionId;
        AsyncOperation::CreateAndStart<ProcessListUploadSessionRequestAsyncOperation>(
            *this,
            move(uploadSessionRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, storeRelativePath, sessionId](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation, sessionId, storeRelativePath, 0);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::DeleteUploadSessionAction)
    {
        DeleteUploadSessionRequest deleteUploadSessionRequest;
        if (!request->GetBody<DeleteUploadSessionRequest>(deleteUploadSessionRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing DeleteUploadSessionRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} Timeout:{2} ActivityId:{3} MessageId:{4} State:{5}",
            action,
            deleteUploadSessionRequest.SessionId,
            timeout,
            activityId,
            messageId,
            this->GetState());

        this->ReplicaObj.FileStoreServiceCounters->OnDeleteUploadSessionRequest();
        Guid sessionId = deleteUploadSessionRequest.SessionId;
        AsyncOperation::CreateAndStart<ProcessDeleteUploadSessionRequestAsyncOperation>(
            *this,
            move(deleteUploadSessionRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, sessionId](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::CreateUploadSessionAction)
    {
        CreateUploadSessionRequest createUploadSessionRequest;
        if (!request->GetBody<CreateUploadSessionRequest>(createUploadSessionRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing CreateUploadSessionRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
        {
            ++createUploadChunkCount_;
            if (createUploadChunkCount_.load() % 7 == 0)
            {
                WriteWarning(
                    TraceComponent,
                    TraceId,
                    "{0}: Dropping incoming message SessionId {1} StorePath : {2}",
                    action,
                    createUploadSessionRequest.SessionId,
                    createUploadSessionRequest.StoreRelativePath
                );

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3} ActivityId:{4} MessageId:{5} State:{6}",
            action,
            createUploadSessionRequest.SessionId,
            createUploadSessionRequest.StoreRelativePath,
            timeout,
            activityId,
            messageId,
            this->GetState());

        this->ReplicaObj.FileStoreServiceCounters->OnCreateUploadSessionRequest();

        wstring storeRelativePath = createUploadSessionRequest.StoreRelativePath;
        Guid sessionId = createUploadSessionRequest.SessionId;
        AsyncOperation::CreateAndStart<ProcessCreateUploadSessionRequestAsyncOperation>(
            *this,
            move(createUploadSessionRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, sessionId, storeRelativePath](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation, sessionId, storeRelativePath, 0);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::UploadChunkAction)
    {
        UploadChunkRequest uploadChunkRequest;
        if (!request->GetBody<UploadChunkRequest>(uploadChunkRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing UploadChunkRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
        {
            ++uploadChunkCount_;
            if (uploadChunkCount_.load() % 11 == 0)
            {
                WriteWarning(
                    TraceComponent,
                    TraceId,
                    "{0}: Dropping incoming message SessionId {1} start/end : {2}/{3}",
                    action,
                    uploadChunkRequest.SessionId,
                    uploadChunkRequest.StartPosition,
                    uploadChunkRequest.EndPosition
                );

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} StartPosition:{2} EndPosition:{3} Timeout:{4} ActivityId:{5} MessageId:{6} State:{7}",
            action,
            uploadChunkRequest.SessionId,
            uploadChunkRequest.StartPosition,
            uploadChunkRequest.EndPosition,
            timeout,
            activityId,
            messageId,
            this->GetState());

        Guid sessionId = uploadChunkRequest.SessionId;
        AsyncOperation::CreateAndStart<ProcessUploadChunkRequestAsyncOperation>(
            *this,
            move(uploadChunkRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, sessionId](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
        },
            this->CreateAsyncOperationRoot());
    }
    else if (action == FileStoreServiceTcpMessage::UploadChunkContentAction)
    {
        UploadChunkContentRequest uploadChunkContentRequest;
        if (!request->GetBody<UploadChunkContentRequest>(uploadChunkContentRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing UploadChunkContentRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} StartPosition:{2} EndPosition:{3} Timeout:{4} ActivityId:{5} MessageId:{6} State:{7}",
            action,
            uploadChunkContentRequest.SessionId,
            uploadChunkContentRequest.StartPosition,
            uploadChunkContentRequest.EndPosition,
            timeout,
            activityId,
            messageId,
            this->GetState());

        Guid sessionId = uploadChunkContentRequest.SessionId;
        AsyncOperation::CreateAndStart<ProcessUploadChunkContentRequestAsyncOperation>(
            *this,
            move(uploadChunkContentRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, sessionId](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
        },
            this->CreateAsyncOperationRoot());
    }
    else if(action == FileStoreServiceTcpMessage::CommitUploadSessionAction)
    {
        UploadSessionRequest uploadSessionRequest;
        if (!request->GetBody<UploadSessionRequest>(uploadSessionRequest))
        {
            auto error = ErrorCode::FromNtStatus(request->Status);
            if (!error.IsSuccess())
            {
                WriteError(
                    TraceComponent,
                    TraceId,
                    "Parsing UploadSessionRequest failed. Error:{0}, MessageId:{1}",
                    error,
                    request->MessageId);

                return;
            }
        }

        if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
        {
            ++commitUploadChunkCount_;
            if (commitUploadChunkCount_.load() % 9 == 0)
            {
                WriteWarning(
                    TraceComponent,
                    TraceId,
                    "{0} : Dropping incoming message SessionId {1} StorePath : {2}",
                    action,
                    uploadSessionRequest.SessionId,
                    uploadSessionRequest.StoreRelativePath
                );

                return;
            }
        }

        WriteInfo(
            TraceComponent,
            TraceId,
            "ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3} ActivityId:{4} MessageId:{5} State:{6}",
            action,
            uploadSessionRequest.SessionId,
            uploadSessionRequest.StoreRelativePath,
            timeout,
            activityId,
            messageId,
            this->GetState());

        this->ReplicaObj.FileStoreServiceCounters->OnCommitUploadSessionRequest();
        wstring storeRelativePath = uploadSessionRequest.StoreRelativePath;
        Guid sessionId = uploadSessionRequest.SessionId;
        int64 startTime(Stopwatch::GetTimestamp());
        AsyncOperation::CreateAndStart<ProcessCommitUploadSessionRequestAsyncOperation>(
            *this,
            move(uploadSessionRequest),
            move(receiverContext),
            activityId,
            timeout,
            [this, storeRelativePath, sessionId, startTime](AsyncOperationSPtr const & asyncOperation)
        {
            this->OnProcessChunkRequestComplete(asyncOperation,
                sessionId,
                storeRelativePath,
                startTime);
        },
            this->CreateAsyncOperationRoot());
    }
    else
    {
        routingAgentProxy_.OnIpcFailure(ErrorCodeValue::InvalidOperation, *receiverContext, activityId);
    }
}