in src/prod/src/Management/FileStoreService/RequestManager.cpp [864:1406]
void RequestManager::OnProcessRequest(
MessageUPtr && request,
IpcReceiverContextUPtr && receiverContext)
{
wstring rejectReason;
if(!ValidateClientMessage(request, rejectReason))
{
WriteWarning(
TraceComponent,
TraceId,
"ValidateClientMessage failed. Reason:{0}, MessageId:{1}",
rejectReason,
request->MessageId);
routingAgentProxy_.OnIpcFailure(ErrorCodeValue::InvalidMessage, *receiverContext);
return;
}
auto timeout = TimeoutHeader::FromMessage(*request).Timeout;
auto activityId = FabricActivityHeader::FromMessage(*request).ActivityId;
auto messageId = request->MessageId;
auto action = request->Action;
if (!IsChunkAction(action))
{
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, ActivityId:{1}, MessageId:{2}, Timeout:{3}, RequestManagerState:{4}",
action,
activityId,
messageId,
timeout,
this->GetState());
}
if(this->GetState() != RequestManager::Active)
{
// Allow read operations from secondary. Reject the rest
if (action != FileStoreServiceTcpMessage::InternalListAction &&
action != FileStoreServiceTcpMessage::GetStoreLocationAction &&
action != FileStoreServiceTcpMessage::GetStoreLocationsAction)
{
routingAgentProxy_.OnIpcFailure(ErrorCodeValue::FileStoreServiceNotReady, *receiverContext, activityId);
return;
}
}
if(action == FileStoreServiceTcpMessage::GetStagingLocationAction)
{
ShareLocationReply shareLocationReply(stagingShareLocation_);
auto replyMessage = move(FileStoreServiceMessage::GetClientOperationSuccess(shareLocationReply, activityId));
routingAgentProxy_.SendIpcReply(move(replyMessage), *receiverContext);
return;
}
else if(action == FileStoreServiceTcpMessage::GetStoreLocationAction || action == FileStoreServiceTcpMessage::GetStoreLocationsAction)
{
ShareLocationReply shareLocationReply(storeShareLocation_);
auto replyMessage = move(FileStoreServiceMessage::GetClientOperationSuccess(shareLocationReply, activityId));
routingAgentProxy_.SendIpcReply(move(replyMessage), *receiverContext);
return;
}
else if(action == FileStoreServiceTcpMessage::UploadAction)
{
UploadRequest uploadRequest;
if(!request->GetBody<UploadRequest>(uploadRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
return;
}
#if defined(PLATFORM_UNIX)
wstring storeRelativePath = uploadRequest.StoreRelativePath;
std::replace(storeRelativePath.begin(),storeRelativePath.end(), '\\', '/');
uploadRequest.StoreRelativePath = storeRelativePath;
#endif
this->ReplicaObj.FileStoreServiceCounters->OnUploadActionRequest();
AsyncOperation::CreateAndStart<ProcessUploadRequestAsyncOperation>(
*this,
move(uploadRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if (action == FileStoreServiceTcpMessage::CopyAction)
{
CopyRequest copyRequest;
if (!request->GetBody<CopyRequest>(copyRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
return;
}
this->ReplicaObj.FileStoreServiceCounters->OnCopyActionRequest();
AsyncOperation::CreateAndStart<ProcessCopyRequestAsyncOperation>(
*this,
move(copyRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if (action == FileStoreServiceTcpMessage::CheckExistenceAction)
{
ImageStoreBaseRequest checkExistenceRequest;
if (!request->GetBody<ImageStoreBaseRequest>(checkExistenceRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing CheckExistenceRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
AsyncOperation::CreateAndStart<ProcessCheckExistenceRequestAsyncOperation>(
*this,
move(checkExistenceRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::ListAction)
{
ListRequest listRequest;
if(!request->GetBody<ListRequest>(listRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
return;
}
AsyncOperation::CreateAndStart<ProcessListRequestAsyncOperation>(
*this,
move(listRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::InternalListAction)
{
ListRequest listRequest;
if(!request->GetBody<ListRequest>(listRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
return;
}
this->ReplicaObj.FileStoreServiceCounters->OnInternalListRequest();
AsyncOperation::CreateAndStart<ProcessInternalListRequestAsyncOperation>(
*this,
move(listRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::DeleteAction)
{
ImageStoreBaseRequest deleteRequest;
if(!request->GetBody<ImageStoreBaseRequest>(deleteRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
return;
}
this->ReplicaObj.FileStoreServiceCounters->OnDeleteRequest();
AsyncOperation::CreateAndStart<ProcessDeleteRequestAsyncOperation>(
*this,
move(deleteRequest),
move(receiverContext),
activityId,
timeout,
[this](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessRequestComplete(asyncOperation);
},
this->CreateAsyncOperationRoot());
}
else if (action == FileStoreServiceTcpMessage::ListUploadSessionAction)
{
UploadSessionRequest uploadSessionRequest;
if (!request->GetBody<UploadSessionRequest>(uploadSessionRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing UploadSessionRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3} ActivityId:{4} MessageId:{5} State:{6}",
action,
uploadSessionRequest.SessionId,
uploadSessionRequest.StoreRelativePath,
timeout,
activityId,
messageId,
this->GetState());
wstring storeRelativePath = uploadSessionRequest.StoreRelativePath;
Guid sessionId = uploadSessionRequest.SessionId;
AsyncOperation::CreateAndStart<ProcessListUploadSessionRequestAsyncOperation>(
*this,
move(uploadSessionRequest),
move(receiverContext),
activityId,
timeout,
[this, storeRelativePath, sessionId](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation, sessionId, storeRelativePath, 0);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::DeleteUploadSessionAction)
{
DeleteUploadSessionRequest deleteUploadSessionRequest;
if (!request->GetBody<DeleteUploadSessionRequest>(deleteUploadSessionRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing DeleteUploadSessionRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} Timeout:{2} ActivityId:{3} MessageId:{4} State:{5}",
action,
deleteUploadSessionRequest.SessionId,
timeout,
activityId,
messageId,
this->GetState());
this->ReplicaObj.FileStoreServiceCounters->OnDeleteUploadSessionRequest();
Guid sessionId = deleteUploadSessionRequest.SessionId;
AsyncOperation::CreateAndStart<ProcessDeleteUploadSessionRequestAsyncOperation>(
*this,
move(deleteUploadSessionRequest),
move(receiverContext),
activityId,
timeout,
[this, sessionId](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::CreateUploadSessionAction)
{
CreateUploadSessionRequest createUploadSessionRequest;
if (!request->GetBody<CreateUploadSessionRequest>(createUploadSessionRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing CreateUploadSessionRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
{
++createUploadChunkCount_;
if (createUploadChunkCount_.load() % 7 == 0)
{
WriteWarning(
TraceComponent,
TraceId,
"{0}: Dropping incoming message SessionId {1} StorePath : {2}",
action,
createUploadSessionRequest.SessionId,
createUploadSessionRequest.StoreRelativePath
);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3} ActivityId:{4} MessageId:{5} State:{6}",
action,
createUploadSessionRequest.SessionId,
createUploadSessionRequest.StoreRelativePath,
timeout,
activityId,
messageId,
this->GetState());
this->ReplicaObj.FileStoreServiceCounters->OnCreateUploadSessionRequest();
wstring storeRelativePath = createUploadSessionRequest.StoreRelativePath;
Guid sessionId = createUploadSessionRequest.SessionId;
AsyncOperation::CreateAndStart<ProcessCreateUploadSessionRequestAsyncOperation>(
*this,
move(createUploadSessionRequest),
move(receiverContext),
activityId,
timeout,
[this, sessionId, storeRelativePath](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation, sessionId, storeRelativePath, 0);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::UploadChunkAction)
{
UploadChunkRequest uploadChunkRequest;
if (!request->GetBody<UploadChunkRequest>(uploadChunkRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing UploadChunkRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
{
++uploadChunkCount_;
if (uploadChunkCount_.load() % 11 == 0)
{
WriteWarning(
TraceComponent,
TraceId,
"{0}: Dropping incoming message SessionId {1} start/end : {2}/{3}",
action,
uploadChunkRequest.SessionId,
uploadChunkRequest.StartPosition,
uploadChunkRequest.EndPosition
);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} StartPosition:{2} EndPosition:{3} Timeout:{4} ActivityId:{5} MessageId:{6} State:{7}",
action,
uploadChunkRequest.SessionId,
uploadChunkRequest.StartPosition,
uploadChunkRequest.EndPosition,
timeout,
activityId,
messageId,
this->GetState());
Guid sessionId = uploadChunkRequest.SessionId;
AsyncOperation::CreateAndStart<ProcessUploadChunkRequestAsyncOperation>(
*this,
move(uploadChunkRequest),
move(receiverContext),
activityId,
timeout,
[this, sessionId](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
},
this->CreateAsyncOperationRoot());
}
else if (action == FileStoreServiceTcpMessage::UploadChunkContentAction)
{
UploadChunkContentRequest uploadChunkContentRequest;
if (!request->GetBody<UploadChunkContentRequest>(uploadChunkContentRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing UploadChunkContentRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} StartPosition:{2} EndPosition:{3} Timeout:{4} ActivityId:{5} MessageId:{6} State:{7}",
action,
uploadChunkContentRequest.SessionId,
uploadChunkContentRequest.StartPosition,
uploadChunkContentRequest.EndPosition,
timeout,
activityId,
messageId,
this->GetState());
Guid sessionId = uploadChunkContentRequest.SessionId;
AsyncOperation::CreateAndStart<ProcessUploadChunkContentRequestAsyncOperation>(
*this,
move(uploadChunkContentRequest),
move(receiverContext),
activityId,
timeout,
[this, sessionId](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation, sessionId, L"", 0);
},
this->CreateAsyncOperationRoot());
}
else if(action == FileStoreServiceTcpMessage::CommitUploadSessionAction)
{
UploadSessionRequest uploadSessionRequest;
if (!request->GetBody<UploadSessionRequest>(uploadSessionRequest))
{
auto error = ErrorCode::FromNtStatus(request->Status);
if (!error.IsSuccess())
{
WriteError(
TraceComponent,
TraceId,
"Parsing UploadSessionRequest failed. Error:{0}, MessageId:{1}",
error,
request->MessageId);
return;
}
}
if (FileStoreServiceConfig::GetConfig().EnableChaosDuringFileUpload)
{
++commitUploadChunkCount_;
if (commitUploadChunkCount_.load() % 9 == 0)
{
WriteWarning(
TraceComponent,
TraceId,
"{0} : Dropping incoming message SessionId {1} StorePath : {2}",
action,
uploadSessionRequest.SessionId,
uploadSessionRequest.StoreRelativePath
);
return;
}
}
WriteInfo(
TraceComponent,
TraceId,
"ProcessRequest: Action:{0}, SessionId:{1} StoreRelativePath:{2} Timeout:{3} ActivityId:{4} MessageId:{5} State:{6}",
action,
uploadSessionRequest.SessionId,
uploadSessionRequest.StoreRelativePath,
timeout,
activityId,
messageId,
this->GetState());
this->ReplicaObj.FileStoreServiceCounters->OnCommitUploadSessionRequest();
wstring storeRelativePath = uploadSessionRequest.StoreRelativePath;
Guid sessionId = uploadSessionRequest.SessionId;
int64 startTime(Stopwatch::GetTimestamp());
AsyncOperation::CreateAndStart<ProcessCommitUploadSessionRequestAsyncOperation>(
*this,
move(uploadSessionRequest),
move(receiverContext),
activityId,
timeout,
[this, storeRelativePath, sessionId, startTime](AsyncOperationSPtr const & asyncOperation)
{
this->OnProcessChunkRequestComplete(asyncOperation,
sessionId,
storeRelativePath,
startTime);
},
this->CreateAsyncOperationRoot());
}
else
{
routingAgentProxy_.OnIpcFailure(ErrorCodeValue::InvalidOperation, *receiverContext, activityId);
}
}