client/testutil/testutil.go (198 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package testutil contains utilities for unit tests.
package testutil
import (
"context"
"errors"
"hash/crc32"
"os"
"testing"
"cloud.google.com/go/kms/apiv1"
ekmpb "cloud.google.com/go/kms/apiv1/kmspb"
kmsrpb "cloud.google.com/go/kms/apiv1/kmspb"
kmsspb "cloud.google.com/go/kms/apiv1/kmspb"
"github.com/GoogleCloudPlatform/stet/client/securesession"
"github.com/googleapis/gax-go/v2"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
)
var (
gcpKMSPrefix = "gcp-kms://"
// CryptoKeyVerSuffix is the suffix for the cryptoKeyVersion in a GCP key identifier.
CryptoKeyVerSuffix = "/cryptoKeyVersions/test"
// ExternalEKMURI is the external URI corresponding to ExternalKEK.
ExternalEKMURI = "https://my-kms.io/external-key"
// ExternalVPCBackend represents the ekmConnection for an External_VPC KEK.
ExternalVPCBackend = "projects/test/locations/test/ekmConnection/testConn"
// ExternalVPCHostname represents the external URI hostname for an External_VPC KEK.
ExternalVPCHostname = "testvpchost"
// ExternalVPCKeyPath represents the keyPath for an External_VPC KEK.
ExternalVPCKeyPath = "api/v1/cckm/ekm/endpoints/testpath"
)
func newKEK(nameSuffix string, protectionLevel kmsrpb.ProtectionLevel) *KEK {
return &KEK{
Name: "projects/test/locations/test/keyRings/test/cryptoKeys" + nameSuffix,
ProtectionLevel: protectionLevel,
}
}
// KEK contains basic information about test KEKs.
type KEK struct {
// Name is a fake CryptoKey name, of the format "projects/*/locations/*/keyRings/*/cryptoKeys/*".
Name string
ProtectionLevel kmsrpb.ProtectionLevel
}
// URI returns the KEK's CloudKMS URI by appending the GCP KMS prefix to the key name.
func (k *KEK) URI() string {
return gcpKMSPrefix + k.Name
}
// ResourceName returns the relative resource name of the KEK, which is the name with a cryptoKeyVersion appended.
func (k *KEK) ResourceName() string {
return k.Name + CryptoKeyVerSuffix
}
var (
// SoftwareKEK represents a test KEK with the Software protection level.
SoftwareKEK = newKEK("testSoftware", kmsrpb.ProtectionLevel_SOFTWARE)
// HSMKEK represents a test KEK with the HSM protection level.
HSMKEK = newKEK("testHsm", kmsrpb.ProtectionLevel_HSM)
// ExternalKEK represents a test KEK with the External protection level.
ExternalKEK = newKEK("testExternal", kmsrpb.ProtectionLevel_EXTERNAL)
// VPCKEK represents a test KEK with the External_VPC protection level.
VPCKEK = newKEK("testExternalVPC", kmsrpb.ProtectionLevel_EXTERNAL_VPC)
)
var defaultKEKs map[kmsrpb.ProtectionLevel]*KEK = map[kmsrpb.ProtectionLevel]*KEK{
kmsrpb.ProtectionLevel_HSM: SoftwareKEK,
kmsrpb.ProtectionLevel_SOFTWARE: HSMKEK,
kmsrpb.ProtectionLevel_EXTERNAL: ExternalKEK,
kmsrpb.ProtectionLevel_EXTERNAL_VPC: VPCKEK,
}
// CreateTempTokenFile creates a temp directory/file as a stand-in for the attestation token.
func CreateTempTokenFile(t *testing.T) string {
// Create token file.
tempDir := t.TempDir()
tokenFile := tempDir + "/test_token"
if err := os.WriteFile(tokenFile, []byte("test token"), 0755); err != nil {
t.Fatalf("Error creating token file at %v: %v", tokenFile, err)
}
return tokenFile
}
// CRC32C returns the Castagnoli CRC32 checksum of the given data.
func CRC32C(data []byte) uint32 {
t := crc32.MakeTable(crc32.Castagnoli)
return crc32.Checksum(data, t)
}
// CreateEnabledCryptoKey creates a fake CryptoKey with the given protection level and name of the
// format "projects/*/locations/*/keyRings/*/cryptoKeys/*".
func CreateEnabledCryptoKey(protectionLevel kmsrpb.ProtectionLevel, name string) *kmsrpb.CryptoKey {
// If a custom name was specified, use it. Otherwise, use the default test URI for that protection level.
if len(name) == 0 {
name = defaultKEKs[protectionLevel].Name
}
ck := &kmsrpb.CryptoKey{
Name: name,
Primary: &kmsrpb.CryptoKeyVersion{
Name: name + CryptoKeyVerSuffix,
State: kmsrpb.CryptoKeyVersion_ENABLED,
ProtectionLevel: protectionLevel,
},
}
// For external protection level, add ExternalProtectionLevelOptions and external URI.
if protectionLevel == kmsrpb.ProtectionLevel_EXTERNAL {
ck.Primary.ExternalProtectionLevelOptions = &kmsrpb.ExternalProtectionLevelOptions{
ExternalKeyUri: ExternalEKMURI,
}
} else if protectionLevel == kmsrpb.ProtectionLevel_EXTERNAL_VPC {
ck.CryptoKeyBackend = ExternalVPCBackend
ck.Primary.ExternalProtectionLevelOptions = &kmsrpb.ExternalProtectionLevelOptions{
EkmConnectionKeyPath: ExternalVPCKeyPath,
}
}
return ck
}
// FakeKeyManagementClient is a fake version of Cloud KMS Key Management client.
type FakeKeyManagementClient struct {
kms.KeyManagementClient
GetCryptoKeyFunc func(context.Context, *kmsspb.GetCryptoKeyRequest, ...gax.CallOption) (*kmsrpb.CryptoKey, error)
EncryptFunc func(context.Context, *kmsspb.EncryptRequest, ...gax.CallOption) (*kmsspb.EncryptResponse, error)
DecryptFunc func(context.Context, *kmsspb.DecryptRequest, ...gax.CallOption) (*kmsspb.DecryptResponse, error)
}
func protectionLevelFromName(name string) kmsrpb.ProtectionLevel {
for k, v := range defaultKEKs {
if v.Name == name {
return k
}
}
return kmsrpb.ProtectionLevel_PROTECTION_LEVEL_UNSPECIFIED
}
func (f *FakeKeyManagementClient) GetCryptoKey(ctx context.Context, req *kmsspb.GetCryptoKeyRequest, opts ...gax.CallOption) (*kmsrpb.CryptoKey, error) {
if f.GetCryptoKeyFunc != nil {
return f.GetCryptoKeyFunc(ctx, req, opts...)
}
return CreateEnabledCryptoKey(protectionLevelFromName(req.GetName()), req.GetName()), nil
}
// FakeKMSWrap returns a fake wrapped share.
func FakeKMSWrap(unwrapped []byte, name string) []byte {
switch name {
case HSMKEK.Name:
return append(unwrapped, byte('H'))
case SoftwareKEK.Name:
return append(unwrapped, byte('S'))
default:
return append(unwrapped, byte('U'))
}
}
// ValidEncryptResponse returns a fake successful response for CloudKMS Encrypt.
func ValidEncryptResponse(req *kmsspb.EncryptRequest) *kmsspb.EncryptResponse {
wrappedShare := FakeKMSWrap(req.GetPlaintext(), req.GetName())
return &kmsspb.EncryptResponse{
Name: req.GetName(),
Ciphertext: wrappedShare,
CiphertextCrc32C: wrapperspb.Int64(int64(CRC32C(wrappedShare))),
VerifiedPlaintextCrc32C: true,
}
}
// Encrypt calls EncryptFunc if applicable. Otherwise returns a fake Encrypt response.
func (f *FakeKeyManagementClient) Encrypt(ctx context.Context, req *kmsspb.EncryptRequest, opts ...gax.CallOption) (*kmsspb.EncryptResponse, error) {
if f.EncryptFunc != nil {
return f.EncryptFunc(ctx, req, opts...)
}
return ValidEncryptResponse(req), nil
}
// FakeKMSUnwrap returns a fake unwrapped share.
func FakeKMSUnwrap(wrapped []byte, name string) []byte {
var final byte
switch name {
case HSMKEK.Name:
final = 'H'
case SoftwareKEK.Name:
final = 'S'
default:
final = 'U'
}
if wrapped[len(wrapped)-1] != final {
return []byte("nonsenseee")
}
return wrapped[:len(wrapped)-1]
}
// ValidDecryptResponse returns a fake successful response for CloudKMS Decrypt.
func ValidDecryptResponse(req *kmsspb.DecryptRequest) *kmsspb.DecryptResponse {
unwrappedShare := FakeKMSUnwrap(req.GetCiphertext(), req.GetName())
return &kmsspb.DecryptResponse{
Plaintext: unwrappedShare,
PlaintextCrc32C: wrapperspb.Int64(int64(CRC32C(unwrappedShare))),
}
}
// Decrypt calls DecryptFunc if applicable. Otherwise returns a fake Decrypt response.
func (f *FakeKeyManagementClient) Decrypt(ctx context.Context, req *kmsspb.DecryptRequest, opts ...gax.CallOption) (*kmsspb.DecryptResponse, error) {
if f.DecryptFunc != nil {
return f.DecryptFunc(ctx, req, opts...)
}
return ValidDecryptResponse(req), nil
}
// Close is a no-op. Needed to implement the KMS Client interface.
func (f *FakeKeyManagementClient) Close() error {
return nil
}
// FakeSecureSessionClient is a test version of a secure session client, used to communicate with
// external EKM.
type FakeSecureSessionClient struct {
securesession.SecureSessionClient
WrapErr error
UnwrapErr error
EndSessionErr error
}
// ConfidentialWrap simulates wrapping a share by appending a single byte ('E') to the end of the
// plaintext to indicate external protection level.
func (f *FakeSecureSessionClient) ConfidentialWrap(_ context.Context, _, _ string, plaintext []byte) ([]byte, error) {
// Return configured error if one was set
if f.WrapErr != nil {
return nil, f.WrapErr
}
return append(plaintext, byte('E')), nil
}
// ConfidentialUnwrap removes the last byte of the wrapped share (mirroring ConfidentalWrap above).
func (f *FakeSecureSessionClient) ConfidentialUnwrap(_ context.Context, _, _ string, wrappedBlob []byte) ([]byte, error) {
// Return configured error if one was set
if f.UnwrapErr != nil {
return nil, f.UnwrapErr
}
return wrappedBlob[:len(wrappedBlob)-1], nil
}
// EndSession is necessary to implement the SecureSessionClient interface.
func (f *FakeSecureSessionClient) EndSession(ctx context.Context) error {
// Return configured error if one was set
if f.EndSessionErr != nil {
return f.EndSessionErr
}
return nil
}
// FakeCloudEKMClient is a fake implementation of the GCP EKM client.
type FakeCloudEKMClient struct {
kms.EkmClient
GetEkmConnectionFunc func(context.Context, *ekmpb.GetEkmConnectionRequest, ...gax.CallOption) (*ekmpb.EkmConnection, error)
}
// GetEkmConnection calls GetEkmConnectionFunc if applicable. Otherwise returns error.
func (f *FakeCloudEKMClient) GetEkmConnection(ctx context.Context, req *ekmpb.GetEkmConnectionRequest, opts ...gax.CallOption) (*ekmpb.EkmConnection, error) {
if f.GetEkmConnectionFunc != nil {
return f.GetEkmConnectionFunc(ctx, req, opts...)
}
return nil, errors.New("unimplemented fake")
}
// Close is a no-op. Needed to implement the EKM Client interface.
func (f *FakeCloudEKMClient) Close() error { return nil }