in shardingsphere-operator/pkg/controllers/storage_node_controller.go [622:675]
func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
// if register storage unit is not enabled, return
if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
return nil
}
if err := r.validateComputeNodeAnnotations(node); err != nil {
return err
}
// if storage unit is already registered, return
if node.Status.Registered {
return nil
}
// if node is not ready, return
if node.Status.Phase != v1alpha1.StorageNodePhaseReady {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterWaiting", "Waiting to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName())
return nil
}
logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName]
dbName := node.Annotations[v1alpha1.AnnotationsInstanceDBName]
ssServer, err := r.getShardingsphereServer(ctx, node)
if err != nil {
return fmt.Errorf("getShardingsphereServer failed: %w", err)
}
defer ssServer.Close()
if err := ssServer.CreateDatabase(logicDBName); err != nil {
return fmt.Errorf("create database failed: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeNormal, "LogicDatabaseCreated", "LogicDatabase %s is created", logicDBName)
var host string
var port int32
var username, password string
// get storage unit info from instance
if node.Status.Cluster.Status == "" {
host, port, username, password = getDatasourceInfoFromInstance(node, storageProvider)
} else {
host, port, username, password = getDatasourceInfoFromCluster(node, storageProvider)
}
if err := ssServer.RegisterStorageUnit(logicDBName, getDSName(node), host, uint(port), dbName, username, password); err != nil {
return fmt.Errorf("register storage node failed: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port, dbName)
node.Status.Registered = true
return nil
}