in pulsar/crypto/default_message_crypto.go [140:207]
func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
keyReader KeyReader,
msgMetadata MessageMetadataSupplier,
payload []byte) ([]byte, error) {
d.encryptLock.Lock()
defer d.encryptLock.Unlock()
if len(encKeys) == 0 {
return payload, nil
}
for _, keyName := range encKeys {
// if key is not already loaded, load it
if _, ok := d.encryptedDataKeyMap.Load(keyName); !ok {
if err := d.addPublicKeyCipher(keyName, keyReader); err != nil {
d.logger.Error(err)
}
}
// add key to the message metadata
if k, ok := d.encryptedDataKeyMap.Load(keyName); ok {
keyInfo, keyInfoOk := k.(*EncryptionKeyInfo)
if keyInfoOk {
msgMetadata.UpsertEncryptionKey(*keyInfo)
} else {
d.logger.Error("failed to get EncryptionKeyInfo for key %v", keyName)
}
} else {
// we should never reach here
msg := fmt.Sprintf("%v Failed to find encrypted Data key for key %v", d.logCtx, keyName)
d.logger.Errorf(msg)
return nil, errors.New(msg)
}
}
// generate a new AES cipher with data key
c, err := aes.NewCipher(d.dataKey)
if err != nil {
d.logger.Error("failed to create AES cipher")
return nil, err
}
// gcm
gcm, err := cipher.NewGCM(c)
if err != nil {
d.logger.Error("failed to create gcm")
return nil, err
}
// create gcm param
nonce := make([]byte, gcm.NonceSize())
_, err = rand.Read(nonce)
if err != nil {
d.logger.Error(err)
return nil, err
}
// Update message metadata with encryption param
msgMetadata.SetEncryptionParam(nonce)
// encrypt payload using seal function
return gcm.Seal(nil, nonce, payload, nil), nil
}