host/AzureRecoveryLib/resthelper/CloudBlockBlob.cpp (260 lines of code) (raw):
#include "CloudBlockBlob.h"
#include "../common/Trace.h"
#include "../common/AzureRecoveryException.h"
#include "securityutils.h"
#include <boost/assert.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/exception/all.hpp>
#include <boost/algorithm/string.hpp>
namespace AzureStorageRest
{
/*
Method : CloudBlockBlob::CloudBlockBlob
Description : CloudBlockBlob constructor
Parameters :
[in] blobSas: Azure block blob's SAS
[in] enableSSLAuthentication : flag to check if SSL authentication be verified
[in] azureBlockBlobParallelUploadChunkSize : max data upload size per thread in bytes
[in] azureBlockBlobMaxWriteSize : max data size in blob in bytes
[in] maxParallelUploadThreads : max number of threads to split and upload data in parallel
Return Code : None
*/
CloudBlockBlob::CloudBlockBlob(const std::string& blobSas,
const std::vector<boost::shared_ptr<HttpClient> >& vPtrHttpClient,
const uint8_t numberOfParallelUploads,
const uint64_t azureBlockBlobParallelUploadSize,
const uint64_t azureBlockBlobMaxWriteSize,
const uint32_t maxParallelUploadThreads) :
m_blob_sas(blobSas),
m_vPtrHttpClient(vPtrHttpClient),
m_NumberOfParallelUploads(numberOfParallelUploads),
m_azureBlockBlobParallelUploadSize(azureBlockBlobParallelUploadSize),
m_azureBlockBlobMaxWriteSize(azureBlockBlobMaxWriteSize),
m_maxParallelUploadThreads(maxParallelUploadThreads),
m_totalWriteSize(0),
m_timeout(0),
m_last_error(0),
m_http_status_code(0),
m_bWriteBlockFailed(false),
m_xmldocBlockIds(boost::make_shared<AzureRecovery::XmlDoccument>("BlockList")),
m_BlockIdsCount(0)
{
}
/*
Method : CloudBlockBlob::WriteBlock
Description : Write/upload block data along with corresponding block ID to Azure blok blob
Parameters :
[in] blockId: block ID for current block data
[in] out_buff : buffer holding block data
[in] length : buffer length
[in] httpClient : a shared_ptr to the HttpClient that should be used to upload block
Return Code : None
*/
void CloudBlockBlob::WriteBlock(const std::string& blockId,
const char* sData,
const blob_size_t length,
boost::shared_ptr<HttpClient> httpClient)
{
TRACE_FUNC_BEGIN;
BOOST_ASSERT(!m_blob_sas.empty());
try
{
pbyte_t out_buff = (pbyte_t)sData;
Uri blobMetadaUri(m_blob_sas);
blobMetadaUri.AddQueryParam(Blob::QueryParamComp, Blob::QueryValueBlock);
blobMetadaUri.AddQueryParam(Blob::QueryValueBlockID, blockId);
HttpRequest request(blobMetadaUri.ToString());
if (m_timeout)
{
request.SetTimeout(m_timeout);
}
request.AddHeader(RestHeader::X_MS_Version, HttpRestUtil::Get_X_MS_Version());
request.AddHeader(RestHeader::Content_Length, boost::lexical_cast<std::string>(length));
// Set Http Method
request.SetHttpMethod(AzureStorageRest::HTTP_PUT);
// Set upload/write buffer stream
request.SetRequestBody(out_buff, length);
HttpResponse response;
if (!httpClient->GetResponse(request, response))
{
boost::unique_lock<boost::mutex> lock(m_mutexLastError);
m_last_error = response.GetErrorCode();
m_http_response_data = response.GetResponseData();
m_http_status_code = response.GetResponseCode();
m_bWriteBlockFailed = true;
TRACE_ERROR("%s: Could not initiate put-blob rest operation, error code %lu.\n",
FUNCTION_NAME, m_last_error);
}
else if (response.GetResponseCode() != HttpErrorCode::CREATED)
{
boost::unique_lock<boost::mutex> lock(m_mutexLastError);
m_last_error = response.GetResponseCode();
m_http_response_data = response.GetResponseData();
m_http_status_code = response.GetResponseCode();
m_bWriteBlockFailed = true;
//Response body will have more meaningful message about failure.
response.PrintHttpErrorMsg();
TRACE_ERROR("%s: Error: put-blob rest api failed, http status code %lu.\n",
FUNCTION_NAME, response.GetResponseCode());
}
}
catch (const std::exception& e)
{
TRACE_ERROR("%s: Exception: %s\n", FUNCTION_NAME, e.what());
}
catch (...)
{
TRACE_ERROR("%s: Unknown Exception\n", FUNCTION_NAME);
}
TRACE_FUNC_END;
}
/*
Method : CloudBlockBlob::WriteBlockBlobMaxWriteSize
Description : Wrapper to WriteBlock to write/upload max write size to Azure block blob
Parameters :
[in] out_buff : buffer holding block data
[in] out_length : buffer length
Return Code : true on success otherwise false
*/
bool CloudBlockBlob::WriteBlockBlobMaxWriteSize(const char* sData, const blob_size_t out_length)
{
TRACE_FUNC_BEGIN;
boost::thread_group threadGroup;
uint8_t threadIndex = 0;
blob_size_t length = out_length;
assert(length <= m_azureBlockBlobMaxWriteSize);
assert(m_vPtrHttpClient.size() == m_NumberOfParallelUploads);
for (uint8_t threadIndex = 0; threadIndex < m_NumberOfParallelUploads; threadIndex++)
{
std::stringstream ssBlockId;
ssBlockId << std::setw(5) << std::setfill('0') << m_BlockIdsCount++;
const std::string &blockId(securitylib::base64Encode(ssBlockId.str().c_str(), ssBlockId.str().length()));
m_xmldocBlockIds->xAddChild("Latest", blockId);
if (length > m_azureBlockBlobParallelUploadSize)
{
try
{
boost::thread *pJobThread = new boost::thread(boost::bind(&CloudBlockBlob::WriteBlock, this, blockId, sData, m_azureBlockBlobParallelUploadSize, m_vPtrHttpClient[threadIndex]));
threadGroup.add_thread(pJobThread);
}
catch (const std::exception& e)
{
TRACE_ERROR("%s: failed to create thread to prcess write %lld bytes.\n",
FUNCTION_NAME, m_azureBlockBlobParallelUploadSize);
m_bWriteBlockFailed = true;
break;
}
}
else
{
WriteBlock(blockId, sData, length, m_vPtrHttpClient[threadIndex]);
break;
}
sData += m_azureBlockBlobParallelUploadSize;
length -= m_azureBlockBlobParallelUploadSize;
if (threadGroup.size() && m_maxParallelUploadThreads && threadIndex >= m_maxParallelUploadThreads)
{
threadGroup.join_all();
}
}
threadGroup.join_all();
/// Cleanup all outstanding BlockIds if WriteBlock failed. This way when we successfully upload data in same blob in next atempt.
/// New set of BlockIds will be committed and old uncommitted BlockIds will be discarded.
if (m_bWriteBlockFailed)
{
m_xmldocBlockIds.reset();
m_xmldocBlockIds = boost::make_shared<AzureRecovery::XmlDoccument>("BlockList");
}
else
{
m_totalWriteSize += out_length;
}
TRACE_FUNC_END;
return !m_bWriteBlockFailed;
}
/*
Method : CloudBlockBlob::Write
Description : Write/upload blob data to Azure block blob
Parameters :
[in] out_buff : buffer holding block data
[in] out_length : buffer length
Return Code : true on success otherwise false
*/
bool CloudBlockBlob::Write(const char* sData, const blob_size_t out_length)
{
TRACE_FUNC_BEGIN;
blob_size_t nLength = out_length;
while (nLength > m_azureBlockBlobMaxWriteSize)
{
if (!WriteBlockBlobMaxWriteSize(sData, m_azureBlockBlobMaxWriteSize))
{
TRACE_ERROR("%s: failed to write %lu bytes\n", FUNCTION_NAME, m_azureBlockBlobMaxWriteSize);
return false;
}
sData += m_azureBlockBlobMaxWriteSize;
nLength -= m_azureBlockBlobMaxWriteSize;
}
if (nLength)
{
if (!WriteBlockBlobMaxWriteSize(sData, nLength))
{
TRACE_ERROR("%s: failed to write %lu bytes\n", FUNCTION_NAME, nLength);
return false;
}
}
TRACE_FUNC_END;
return true;
}
/*
Method : CloudBlockBlob::AddMetadataHeadersToRequest
Description : Adds metadata headers to request
Parameters :
[out] request : HttpRequest to update with metadata
[in] metadata : set of metadata
Return Code : None
*/
void CloudBlockBlob::AddMetadataHeadersToRequest(HttpRequest& request, const metadata_t& metadata)
{
TRACE_FUNC_BEGIN;
metadata_const_iter_t iterBegin = metadata.begin(),
iterEnd = metadata.end();
for (; iterBegin != iterEnd; iterBegin++)
{
std::string key = iterBegin->first;
std::string value = iterBegin->second;
boost::trim(key);
boost::trim(value);
if (!key.empty() && !value.empty())
{
key = RestHeader::X_MS_Meta_Prefix + key;
request.AddHeader(key, value);
}
else
{
TRACE_WARNING("%s: Empty metadata key (or) values found. Hence ignoring [%s] -> [%s]\n",
FUNCTION_NAME, iterBegin->first.c_str(), iterBegin->second.c_str());
}
}
TRACE_FUNC_END;
}
/*
Method : CloudBlockBlob::SetMetadata
Description : Commits the metadata along with outstanding block IDs to remote block blob
Parameters :
[in] metadata : set of metadata
Return Code : true on success otherwise false
*/
bool CloudBlockBlob::SetMetadata(const metadata_t& metadata)
{
TRACE_FUNC_BEGIN;
bool bRet = true;
BOOST_ASSERT(!m_blob_sas.empty());
assert(m_vPtrHttpClient.size() > 0);
try
{
do
{
Uri blobMetadaUri(m_blob_sas);
// Set metadata query parameter comp=metadata
blobMetadaUri.AddQueryParam(Blob::QueryParamComp, Blob::QueryValueBlockList);
HttpRequest request(blobMetadaUri.ToString());
if (m_timeout)
request.SetTimeout(m_timeout);
request.AddHeader(RestHeader::X_MS_Version, HttpRestUtil::Get_X_MS_Version());
// Set block Ids in XML format in request body
xmlChar* docstr;
int len;
m_xmldocBlockIds->xGetXmlDoc(docstr, len);
// Set upload/write buffer stream
request.SetRequestBody((pbyte_t)docstr, len);
request.AddHeader(RestHeader::Content_Length, boost::lexical_cast<std::string>(len));
AddMetadataHeadersToRequest(request, metadata);
request.SetHttpMethod(AzureStorageRest::HTTP_PUT);
HttpResponse response;
if (!m_vPtrHttpClient[0]->GetResponse(request, response))
{
bRet = false;
m_last_error = response.GetErrorCode();
TRACE_ERROR("%s: Could not initiate set-blob-metadata rest operation, error code %lu.\n",
FUNCTION_NAME, m_last_error);
}
else if (response.GetResponseCode() != HttpErrorCode::CREATED)
{
bRet = false;
TRACE_ERROR("%s: Error: set-blob-metadata rest api failed, http status code %lu.\n",
FUNCTION_NAME, response.GetResponseCode());
m_last_error = response.GetResponseCode();
// Response body will have more meaningful message about failure.
response.PrintHttpErrorMsg();
}
m_http_status_code = response.GetResponseCode();
} while (false);
}
catch (const std::exception& e)
{
TRACE_ERROR("%s: Exception: %s\n", FUNCTION_NAME, e.what());
bRet = false;
}
catch (...)
{
TRACE_ERROR("%s: Unknown Exception\n", FUNCTION_NAME);
bRet = false;
}
TRACE_FUNC_END;
return bRet;
}
/*
Method : CloudBlockBlob::ClearBlockIds
Description : Clears all current outstanding block IDs
Parameters :
Return Code : true on success otherwise false
*/
bool CloudBlockBlob::ClearBlockIds()
{
// We need not physically delete the blob on Azure storage but to clear current set of block IDs.
// This will allow fresh blocks to be uploaded on retry even if new block ID match to existing one and existing the block data wil be replaced with new data.
// Also uncommitted blocks will be discarded one week after the last successful block upload by storage service.
m_xmldocBlockIds.reset();
m_xmldocBlockIds = boost::make_shared<AzureRecovery::XmlDoccument>("BlockList");
return true;
}
/*
Method : CloudPageBlob::SetHttpProxy
Description : configures the proxy settings to be used for accessing Azure blob
Parameters : [in] proxy : HttpProxy settings.
Return Code : none.
*/
void CloudBlockBlob::SetHttpProxy(const HttpProxy& proxy)
{
for (int i = 0; i < m_vPtrHttpClient.size(); i++)
{
m_vPtrHttpClient[i]->SetProxy(proxy);
}
}
}