func()

in pulsar/crypto/default_message_crypto.go [140:207]


func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
	keyReader KeyReader,
	msgMetadata MessageMetadataSupplier,
	payload []byte) ([]byte, error) {
	d.encryptLock.Lock()
	defer d.encryptLock.Unlock()

	if len(encKeys) == 0 {
		return payload, nil
	}

	for _, keyName := range encKeys {
		// if key is not already loaded, load it
		if _, ok := d.encryptedDataKeyMap.Load(keyName); !ok {
			if err := d.addPublicKeyCipher(keyName, keyReader); err != nil {
				d.logger.Error(err)
			}
		}

		// add key to the message metadata
		if k, ok := d.encryptedDataKeyMap.Load(keyName); ok {
			keyInfo, keyInfoOk := k.(*EncryptionKeyInfo)

			if keyInfoOk {
				msgMetadata.UpsertEncryptionKey(*keyInfo)
			} else {
				d.logger.Error("failed to get EncryptionKeyInfo for key %v", keyName)
			}
		} else {
			// we should never reach here
			msg := fmt.Sprintf("%v Failed to find encrypted Data key for key %v", d.logCtx, keyName)
			d.logger.Errorf(msg)
			return nil, fmt.Errorf(msg)
		}

	}

	// generate a new AES cipher with data key
	c, err := aes.NewCipher(d.dataKey)

	if err != nil {
		d.logger.Error("failed to create AES cipher")
		return nil, err
	}

	// gcm
	gcm, err := cipher.NewGCM(c)

	if err != nil {
		d.logger.Error("failed to create gcm")
		return nil, err
	}

	// create gcm param
	nonce := make([]byte, gcm.NonceSize())
	_, err = rand.Read(nonce)

	if err != nil {
		d.logger.Error(err)
		return nil, err
	}

	// Update message metadata with encryption param
	msgMetadata.SetEncryptionParam(nonce)

	// encrypt payload using seal function
	return gcm.Seal(nil, nonce, payload, nil), nil
}