bool MessageCrypto::encrypt()

in lib/MessageCrypto.cc [225:340]


bool MessageCrypto::encrypt(const std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader,
                            proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
                            SharedBuffer& encryptedPayload) {
    if (!encKeys.size()) {
        return false;
    }

    Lock lock(mutex_);

    // Update message metadata with encrypted data key
    for (auto it = encKeys.begin(); it != encKeys.end(); it++) {
        const std::string& keyName = *it;
        auto keyInfoIter = encryptedDataKeyMap_.find(keyName);

        if (keyInfoIter == encryptedDataKeyMap_.end()) {
            // Attempt to load the key. This will allow us to load keys as soon as
            // a new key is added to producer config
            Result result = addPublicKeyCipher(keyName, keyReader);
            if (result != ResultOk) {
                return false;
            }

            keyInfoIter = encryptedDataKeyMap_.find(keyName);

            if (keyInfoIter == encryptedDataKeyMap_.end()) {
                LOG_ERROR(logCtx_ << "Unable to find encrypted data key for " << keyName);
                return false;
            }
        }
        EncryptionKeyInfo* keyInfo = keyInfoIter->second.get();

        proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New();
        encKeys->set_key(keyName);
        encKeys->set_value(keyInfo->getKey());
        if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
            std::string strHex = stringToHex(keyInfo->getKey(), keyInfo->getKey().size());
            LOG_DEBUG(logCtx_ << " Encrypted data key added for key " << keyName << ". Encrypted key size = "
                              << keyInfo->getKey().size() << ", value = " << strHex);
        }

        if (keyInfo->getMetadata().size()) {
            for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end();
                 metaIter++) {
                proto::KeyValue* keyValue = proto::KeyValue().New();
                keyValue->set_key(metaIter->first);
                keyValue->set_value(metaIter->second);
                encKeys->mutable_metadata()->AddAllocated(keyValue);
                LOG_DEBUG(logCtx_ << " Adding metadata for key " << keyName << ". Metadata key = "
                                  << metaIter->first << ", value =" << metaIter->second);
            }
        }

        msgMetadata.mutable_encryption_keys()->AddAllocated(encKeys);
    }

    // TODO: Replace random with counter and periodic refreshing based on timer/counter value
    RAND_bytes(iv_.get(), ivLen_);
    msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), ivLen_);

    EVP_CIPHER_CTX* cipherCtx = NULL;
    encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
    int encLen = 0;

    if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
        LOG_ERROR(logCtx_ << " Failed to cipher ctx.");
        return false;
    }

    if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) {
        LOG_ERROR(logCtx_ << " Failed to init cipher ctx.");
        EVP_CIPHER_CTX_free(cipherCtx);
        return false;
    }

    if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
        LOG_ERROR(logCtx_ << " Failed to set cipher padding.");
        EVP_CIPHER_CTX_free(cipherCtx);
        return false;
    }

    if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
                          &encLen, reinterpret_cast<unsigned const char*>(payload.data()),
                          payload.readableBytes()) != 1) {
        LOG_ERROR(logCtx_ << " Failed to encrypt payload.");
        EVP_CIPHER_CTX_free(cipherCtx);
        return false;
    }
    encryptedPayload.bytesWritten(encLen);
    encLen = 0;

    if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
                            &encLen) != 1) {
        LOG_ERROR(logCtx_ << " Failed to finalize encryption.");
        EVP_CIPHER_CTX_free(cipherCtx);
        return false;
    }
    encryptedPayload.bytesWritten(encLen);

    if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) {
        LOG_ERROR(logCtx_ << " Failed to get cipher tag info.");
        EVP_CIPHER_CTX_free(cipherCtx);
        return false;
    }
    encryptedPayload.bytesWritten(tagLen_);
    if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
        std::string strPayloadHex = stringToHex(payload.data(), payload.readableBytes());
        std::string strHex = stringToHex(encryptedPayload.data(), encryptedPayload.readableBytes());
        LOG_DEBUG(logCtx_ << " Original size = " << payload.readableBytes() << ", value = " << strPayloadHex
                          << ". Encrypted size " << encryptedPayload.readableBytes()
                          << ", value =" << strHex);
    }

    EVP_CIPHER_CTX_free(cipherCtx);

    return true;
}