oracle/controllers/databasecontroller/database_resources.go (255 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package databasecontroller
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/integer"
commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1"
v1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/common/sql"
k8s "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s"
)
const (
gsmResourceVersionString = "projects/%s/secrets/%s/versions/%s"
pdbAdminUserName = "GPDB_ADMIN"
)
var (
dialTimeout = 10 * time.Minute
)
// NewDatabase attempts to create a new PDB if it doesn't exist yet.
// The first return value of NewDatabase is "bail out or not?".
// If a PDB is new, just created now, NewDatabase returns bail=false.
// If it's an existing PDB, NewDatabase returns bail=true (so that the rest
// of the workflow, e.g. creating users step, is not attempted).
func NewDatabase(ctx context.Context, r *DatabaseReconciler, db *v1alpha1.Database, dbDomain, cdbName string, log logr.Logger) (bool, error) {
r.Recorder.Eventf(db, corev1.EventTypeNormal, k8s.CreatingDatabase, fmt.Sprintf("Creating new database %q", db.Spec.Name))
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
req := &controllers.CreateDatabaseRequest{
Name: db.Spec.Name,
CdbName: cdbName,
DbDomain: dbDomain,
}
userVerStr := ""
// database_controller.validateSpec has validated the spec earlier;
// So no duplicated validation here.
if db.Spec.AdminPassword != "" {
userVerStr = db.Spec.AdminPassword
req.Password = db.Spec.AdminPassword
if lastPwd, ok := db.Status.UserResourceVersions[pdbAdminUserName]; ok {
req.LastPassword = lastPwd
}
}
if db.Spec.AdminPasswordGsmSecretRef != nil {
userVerStr = fmt.Sprintf(gsmResourceVersionString, db.Spec.AdminPasswordGsmSecretRef.ProjectId, db.Spec.AdminPasswordGsmSecretRef.SecretId, db.Spec.AdminPasswordGsmSecretRef.Version)
ref := &controllers.GsmSecretReference{
ProjectId: db.Spec.AdminPasswordGsmSecretRef.ProjectId,
SecretId: db.Spec.AdminPasswordGsmSecretRef.SecretId,
Version: db.Spec.AdminPasswordGsmSecretRef.Version,
}
if lastVer, ok := db.Status.UserResourceVersions[pdbAdminUserName]; ok {
ref.LastVersion = lastVer
}
req.AdminPasswordGsmSecretRef = ref
}
cdOut, err := controllers.CreateDatabase(ctx, r, r.DatabaseClientFactory, db.Namespace, db.Spec.Instance, *req)
if err != nil {
return false, fmt.Errorf("resource/NewDatabase: failed on CreateDatabase gRPC call: %v", err)
}
log.Info("resource/NewDatabase: CreateDatabase DONE with this output", "out", cdOut)
// "AdminUserSyncCompleted" status indicates PDB existed
// and admin user sync completed.
if cdOut == "AdminUserSyncCompleted" {
r.Recorder.Eventf(db, corev1.EventTypeWarning, k8s.DatabaseAlreadyExists, fmt.Sprintf("Database %q already exists, sync admin user performed", db.Spec.Name))
// Update user version status map after newly synced database admin user.
// The caller will update the status by r.Status().Update.
if db.Status.UserResourceVersions == nil {
db.Status.UserResourceVersions = make(map[string]string)
}
db.Status.UserResourceVersions[pdbAdminUserName] = userVerStr
// Return true indicating PDB already existed and return
// PDB admin userVerMap which need to by synced by caller.
// The caller will trigger syncUser instead of createUser later.
return true, nil
}
// Indicated underlying database exists and admin user is in sync with the config.
if cdOut == "AlreadyExists" {
r.Recorder.Eventf(db, corev1.EventTypeWarning, k8s.DatabaseAlreadyExists, fmt.Sprintf("Database %q already exists", db.Spec.Name))
return true, nil
}
hostname, err := os.Hostname()
if err != nil {
log.Error(err, "resources/NewDatabase: failed to get a hostname")
}
log.V(1).Info("resources/NewDatabase: new database requested: DONE", "hostname", hostname)
// Update user version status map after newly created database.
// The caller will update the status by r.Status().Update.
if db.Status.UserResourceVersions == nil {
db.Status.UserResourceVersions = make(map[string]string)
}
db.Status.UserResourceVersions[pdbAdminUserName] = userVerStr
return false, nil
}
// NewUsers attempts to create a new user.
func NewUsers(ctx context.Context, r *DatabaseReconciler, db *v1alpha1.Database, dbDomain, cdbName string, log logr.Logger) error {
log.Info("resources/NewUsers: new database users requested", "dbName", db.Spec.Name, "requestedUsers", db.Spec.Users)
var usernames, usersCmds, grantsCmds []string
var userSpecs []*controllers.User
userVerMap := make(map[string]string)
// Copy pdb admin user version into local map to sync later.
if v, ok := db.Status.UserResourceVersions[pdbAdminUserName]; ok {
userVerMap[pdbAdminUserName] = v
}
for k, u := range db.Spec.Users {
log.Info("create user", "user#", k, "username", u.Name)
if len(usernames) < 3 {
usernames = append(usernames, u.Name)
} else if len(usernames) == 3 {
usernames = append(usernames, "...")
}
// database_controller.validateSpec has validated the spec earlier;
// So no duplicated validation here.
if u.Password != "" {
usersCmds = append(usersCmds, sql.QueryCreateUser(u.Name, u.Password))
userVerMap[u.Name] = u.Password
}
if u.GsmSecretRef != nil {
userSpecs = append(userSpecs, &controllers.User{
Name: u.Name,
PasswordGsmSecretRef: &controllers.GsmSecretReference{
ProjectId: u.GsmSecretRef.ProjectId,
SecretId: u.GsmSecretRef.SecretId,
Version: u.GsmSecretRef.Version,
}})
userVerMap[u.Name] = fmt.Sprintf(gsmResourceVersionString, u.GsmSecretRef.ProjectId, u.GsmSecretRef.SecretId, u.GsmSecretRef.Version)
}
for _, p := range u.Privileges {
grantsCmds = append(grantsCmds, sql.QueryGrantPrivileges(string(p), u.Name))
}
}
r.Recorder.Eventf(db, corev1.EventTypeNormal, k8s.CreatingUser, "Creating new users %v", usernames)
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
req := &controllers.CreateUsersRequest{
CdbName: cdbName,
PdbName: db.Spec.Name,
GrantPrivsCmd: grantsCmds,
DbDomain: dbDomain,
}
if usersCmds != nil {
req.CreateUsersCmd = usersCmds
}
if userSpecs != nil {
req.User = userSpecs
}
cdOut, err := controllers.CreateUsers(ctx, r, r.DatabaseClientFactory, db.Namespace, db.Spec.Instance, *req)
if err != nil {
log.Error(err, "resources/NewUsers: failed on CreateUsers gRPC call")
}
log.Info("resources/NewUsers: CreateUsers succeeded with this output", "output", cdOut)
hostname, err := os.Hostname()
if err != nil {
log.Error(err, "resources/NewUsers: failed to get a hostname")
}
log.V(1).Info("resources/NewUsers: new database users requested: DONE", "hostname", hostname)
r.Recorder.Eventf(db, corev1.EventTypeNormal, k8s.CreatedUser, "Created new users %v", usernames)
db.Status.Conditions = k8s.Upsert(db.Status.Conditions, k8s.UserReady, v1.ConditionTrue, k8s.CreateComplete, "")
db.Status.UserNames = usernames
db.Status.UserResourceVersions = userVerMap
r.updateIsChangeApplied(ctx, db)
if err := r.Status().Update(ctx, db); err != nil {
return err
}
return nil
}
// SyncUsers attempts to update PDB users.
func SyncUsers(ctx context.Context, r *DatabaseReconciler, db *v1alpha1.Database, cdbName string, log logr.Logger) error {
log.Info("resources/syncUsers: sync database users requested", "db", db)
r.Recorder.Eventf(db, corev1.EventTypeNormal, k8s.SyncingUser, fmt.Sprintf("Syncing users for database %q", db.Spec.Name))
var userSpecs []*controllers.User
var usernames []string
userVerMap := make(map[string]string)
// Copy pdb admin user version into local map to sync later.
if v, ok := db.Status.UserResourceVersions[pdbAdminUserName]; ok {
userVerMap[pdbAdminUserName] = v
}
for _, user := range db.Spec.Users {
var privs []string
usernames = append(usernames, user.Name)
for _, specPriv := range user.Privileges {
privs = append(privs, string(specPriv))
}
userSpec := &controllers.User{
Name: user.Name,
Privileges: privs,
}
// database_controller.validateSpec has validated the spec earlier;
// So no duplicated validation here.
if user.Password != "" {
userVerMap[user.Name] = user.Password
userSpec.Password = user.Password
lastPwd, ok := db.Status.UserResourceVersions[user.Name]
if ok {
userSpec.LastPassword = lastPwd
}
}
if user.GsmSecretRef != nil {
userVerMap[user.Name] = fmt.Sprintf(gsmResourceVersionString, user.GsmSecretRef.ProjectId, user.GsmSecretRef.SecretId, user.GsmSecretRef.Version)
ref := &controllers.GsmSecretReference{
ProjectId: user.GsmSecretRef.ProjectId,
SecretId: user.GsmSecretRef.SecretId,
Version: user.GsmSecretRef.Version,
}
if lastVer, ok := db.Status.UserResourceVersions[user.Name]; ok {
ref.LastVersion = lastVer
}
userSpec.PasswordGsmSecretRef = ref
}
userSpecs = append(userSpecs, userSpec)
}
req := &controllers.UsersChangedRequest{
PdbName: db.Spec.Name,
UserSpecs: userSpecs,
}
resp, err := controllers.UsersChanged(ctx, r, r.DatabaseClientFactory, db.GetNamespace(), db.Spec.Instance, *req)
if err != nil {
log.Error(err, "resources/syncUsers: failed on UsersChanged gRPC call")
return err
}
if resp.Changed {
db.Status.Phase = commonv1alpha1.DatabaseUpdating
db.Status.Conditions = k8s.Upsert(db.Status.Conditions, k8s.UserReady, v1.ConditionFalse, k8s.SyncInProgress, "")
if err := r.Status().Update(ctx, db); err != nil {
return err
}
log.Info("resources/syncUsers: update database users requested", "CDB", cdbName, "PDB", db.Spec.Name)
req := &controllers.UpdateUsersRequest{
PdbName: db.Spec.Name,
UserSpecs: userSpecs,
}
if err := controllers.UpdateUsers(ctx, r, r.DatabaseClientFactory, db.GetNamespace(), db.Spec.Instance, *req); err != nil {
log.Error(err, "resources/syncUsers: failed on UpdateUser gRPC call")
return err
}
log.Info("resources/syncUsers: update database users done", "CDB", cdbName, "PDB", db.Spec.Name)
}
log.Info("resources/syncUsers: sync database users done", "CDB", cdbName, "PDB", db.Spec.Name)
userReady := &v1.Condition{
Type: k8s.UserReady,
Status: v1.ConditionTrue,
Reason: k8s.SyncComplete,
Message: "",
}
if len(resp.Suppressed) != 0 {
userReady.Status = v1.ConditionFalse
userReady.Reason = k8s.UserOutOfSync
var msg []string
for _, u := range resp.Suppressed {
if u.SuppressType == controllers.UsersChangedResponse_DELETE {
msg = append(msg, fmt.Sprintf("User %q not defined in database spec, "+
"supposed to be deleted. suppressed SQL %q. Fix by deleting the user in DB or updating DB spec to include the user", u.UserName, u.Sql))
} else if u.SuppressType == controllers.UsersChangedResponse_CREATE {
msg = append(msg, fmt.Sprintf("User %q cannot be created, "+
"password is not provided. Fix by creating the user in DB or updating DB spec to include password", u.UserName))
}
}
userReady.Message = strings.Join(msg, ".")
}
if k8s.ConditionStatusEquals(userReady, v1.ConditionTrue) {
r.Recorder.Eventf(db, corev1.EventTypeNormal, k8s.SyncedUser, fmt.Sprintf("Synced users for database %q", db.Spec.Name))
} else {
r.Recorder.Eventf(db, corev1.EventTypeWarning, k8s.FailedToSyncUser, fmt.Sprintf("Failed to sync users for database %q, %s", db.Spec.Name, userReady.Message))
}
db.Status.Conditions = k8s.Upsert(db.Status.Conditions, userReady.Type, userReady.Status, userReady.Reason, userReady.Message)
db.Status.UserResourceVersions = userVerMap
db.Status.UserNames = usernames[0:integer.IntMin(3, len(usernames))]
if len(usernames) > 3 {
db.Status.UserNames = append(db.Status.UserNames, "...")
}
r.updateIsChangeApplied(ctx, db)
if err := r.Status().Update(ctx, db); err != nil {
return err
}
return nil
}