pkg/appmgmt/sparkoperator/spark.go (78 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sparkoperator
import (
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"go.uber.org/zap"
k8sCache "k8s.io/client-go/tools/cache"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/log"
crcClientSet "github.com/apache/yunikorn-k8shim/pkg/sparkclient/clientset/versioned"
crInformers "github.com/apache/yunikorn-k8shim/pkg/sparkclient/informers/externalversions"
)
// Manager implements interfaces#Recoverable, interfaces#AppManager
type Manager struct {
amProtocol interfaces.ApplicationManagementProtocol
apiProvider client.APIProvider
crdInformer k8sCache.SharedIndexInformer
crdInformerFactory crInformers.SharedInformerFactory
stopCh chan struct{}
}
func NewManager(amProtocol interfaces.ApplicationManagementProtocol, apiProvider client.APIProvider) *Manager {
return &Manager{
amProtocol: amProtocol,
apiProvider: apiProvider,
stopCh: make(chan struct{}),
}
}
// ServiceInit implements AppManagementService interface
/*
It watches for changes to the SparkApplications CRD objects
Two event handlers are defined to react accordingly when an SparkApplication is updated or deleted.
Note that there's no need for an event handler for AddFunc because when a SparkApplication object
is first created, the application ID has not been generated yet. It will only be available after the driver
pod starts and then the Spark K8s backend will assign a string that starts with "spark-" as the app ID
*/
func (os *Manager) ServiceInit() error {
crClient, err := crcClientSet.NewForConfig(
os.apiProvider.GetAPIs().KubeClient.GetConfigs())
if err != nil {
return err
}
var factoryOpts []crInformers.SharedInformerOption
os.crdInformerFactory = crInformers.NewSharedInformerFactoryWithOptions(
crClient, 0, factoryOpts...)
os.crdInformerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
os.crdInformer = os.crdInformerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
os.crdInformer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
UpdateFunc: os.updateApplication,
DeleteFunc: os.deleteApplication,
})
log.Log(log.ShimAppMgmtSparkOperator).Info("Spark operator AppMgmt service initialized")
return nil
}
func (os *Manager) Name() string {
return "spark-k8s-operator"
}
func (os *Manager) Start() error {
if os.crdInformerFactory != nil {
log.Log(log.ShimAppMgmtSparkOperator).Info("starting", zap.String("Name", os.Name()))
go os.crdInformerFactory.Start(os.stopCh)
}
return nil
}
func (os *Manager) Stop() {
log.Log(log.ShimAppMgmtSparkOperator).Info("stopping", zap.String("Name", os.Name()))
os.stopCh <- struct{}{}
}
/*
When a SparkApplication's state is updated and the new state is one of
FailedState or CompletedState, send the ApplicationFail and ApplicationComplete
message, respectively, through the app mgmt protocol
*/
func (os *Manager) updateApplication(old, new interface{}) {
appOld := old.(*v1beta2.SparkApplication)
appNew := new.(*v1beta2.SparkApplication)
currState := appNew.Status.AppState.State
log.Log(log.ShimAppMgmtSparkOperator).Debug("spark app updated",
zap.Any("old", appOld),
zap.Any("new", appNew),
zap.Any("new state", string(currState)))
if currState == v1beta2.FailedState {
log.Log(log.ShimAppMgmtSparkOperator).Debug("SparkApp has failed. Ready to initiate app cleanup")
os.amProtocol.NotifyApplicationFail(appNew.Status.SparkApplicationID)
} else if currState == v1beta2.CompletedState {
log.Log(log.ShimAppMgmtSparkOperator).Debug("SparkApp has completed. Ready to initiate app cleanup")
os.amProtocol.NotifyApplicationComplete(appNew.Status.SparkApplicationID)
}
}
/*
When a request to delete a SparkApplicaiton is detected,
send an ApplicationComplete message through the app mgmt protocol
*/
func (os *Manager) deleteApplication(obj interface{}) {
app := obj.(*v1beta2.SparkApplication)
log.Log(log.ShimAppMgmtSparkOperator).Info("spark app deleted", zap.Any("SparkApplication", app))
os.amProtocol.NotifyApplicationComplete(app.Status.SparkApplicationID)
}