in lib/MessageCrypto.cc [225:340]
bool MessageCrypto::encrypt(const std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload) {
if (!encKeys.size()) {
return false;
}
Lock lock(mutex_);
// Update message metadata with encrypted data key
for (auto it = encKeys.begin(); it != encKeys.end(); it++) {
const std::string& keyName = *it;
auto keyInfoIter = encryptedDataKeyMap_.find(keyName);
if (keyInfoIter == encryptedDataKeyMap_.end()) {
// Attempt to load the key. This will allow us to load keys as soon as
// a new key is added to producer config
Result result = addPublicKeyCipher(keyName, keyReader);
if (result != ResultOk) {
return false;
}
keyInfoIter = encryptedDataKeyMap_.find(keyName);
if (keyInfoIter == encryptedDataKeyMap_.end()) {
LOG_ERROR(logCtx_ << "Unable to find encrypted data key for " << keyName);
return false;
}
}
EncryptionKeyInfo* keyInfo = keyInfoIter->second.get();
proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New();
encKeys->set_key(keyName);
encKeys->set_value(keyInfo->getKey());
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(keyInfo->getKey(), keyInfo->getKey().size());
LOG_DEBUG(logCtx_ << " Encrypted data key added for key " << keyName << ". Encrypted key size = "
<< keyInfo->getKey().size() << ", value = " << strHex);
}
if (keyInfo->getMetadata().size()) {
for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end();
metaIter++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(metaIter->first);
keyValue->set_value(metaIter->second);
encKeys->mutable_metadata()->AddAllocated(keyValue);
LOG_DEBUG(logCtx_ << " Adding metadata for key " << keyName << ". Metadata key = "
<< metaIter->first << ", value =" << metaIter->second);
}
}
msgMetadata.mutable_encryption_keys()->AddAllocated(encKeys);
}
// TODO: Replace random with counter and periodic refreshing based on timer/counter value
RAND_bytes(iv_.get(), ivLen_);
msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), ivLen_);
EVP_CIPHER_CTX* cipherCtx = NULL;
encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
int encLen = 0;
if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
LOG_ERROR(logCtx_ << " Failed to cipher ctx.");
return false;
}
if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) {
LOG_ERROR(logCtx_ << " Failed to init cipher ctx.");
EVP_CIPHER_CTX_free(cipherCtx);
return false;
}
if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
LOG_ERROR(logCtx_ << " Failed to set cipher padding.");
EVP_CIPHER_CTX_free(cipherCtx);
return false;
}
if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
&encLen, reinterpret_cast<unsigned const char*>(payload.data()),
payload.readableBytes()) != 1) {
LOG_ERROR(logCtx_ << " Failed to encrypt payload.");
EVP_CIPHER_CTX_free(cipherCtx);
return false;
}
encryptedPayload.bytesWritten(encLen);
encLen = 0;
if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
&encLen) != 1) {
LOG_ERROR(logCtx_ << " Failed to finalize encryption.");
EVP_CIPHER_CTX_free(cipherCtx);
return false;
}
encryptedPayload.bytesWritten(encLen);
if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) {
LOG_ERROR(logCtx_ << " Failed to get cipher tag info.");
EVP_CIPHER_CTX_free(cipherCtx);
return false;
}
encryptedPayload.bytesWritten(tagLen_);
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strPayloadHex = stringToHex(payload.data(), payload.readableBytes());
std::string strHex = stringToHex(encryptedPayload.data(), encryptedPayload.readableBytes());
LOG_DEBUG(logCtx_ << " Original size = " << payload.readableBytes() << ", value = " << strPayloadHex
<< ". Encrypted size " << encryptedPayload.readableBytes()
<< ", value =" << strHex);
}
EVP_CIPHER_CTX_free(cipherCtx);
return true;
}