core/maxframe/learn/contrib/xgboost/core.py (235 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
try:
import xgboost
except ImportError:
xgboost = None
from ....core import OutputType
from ...core import Model, ModelData
from ..models import ModelApplyChunk, to_remote_model
from .dmatrix import DMatrix
class BoosterData(ModelData):
__slots__ = ("_evals_result",)
_evals_result: Dict
def __init__(self, *args, evals_result=None, **kwargs):
super().__init__(*args, **kwargs)
self._evals_result = evals_result if evals_result is not None else dict()
@staticmethod
def _get_booster_score(bst, fmap=None, importance_type="weight"):
if not fmap:
tmp_file_name = ""
else:
tmp_file = tempfile.NamedTemporaryFile(delete=False)
tmp_file.write(fmap)
tmp_file.close()
tmp_file_name = tmp_file.name
try:
return bst.get_score(fmap=tmp_file_name, importance_type=importance_type)
finally:
if tmp_file_name:
os.unlink(tmp_file_name)
def get_score(self, fmap="", importance_type="weight"):
op = ModelApplyChunk(
func=self._get_booster_score, output_types=[OutputType.object]
)
if not fmap:
fmap_data = None
else:
with open(fmap, "rb") as fmap_file:
fmap_data = fmap_file.read()
return op(self, [{}], fmap=fmap_data, importance_type=importance_type)[0]
def execute(self, session=None, **kw):
# The evals_result should be fetched when BoosterData.execute() is called.
result = super().execute(session=session, **kw)
if self.op.has_evals_result and self.key == self.op.outputs[0].key:
self._evals_result.update(self.op.outputs[1].fetch(session=session))
return result
def predict(
self,
data,
output_margin=False,
pred_leaf=False,
pred_contribs=False,
approx_contribs=False,
pred_interactions=False,
validate_features=True,
training=False,
iteration_range=None,
strict_shape=False,
):
from .predict import predict
return predict(
self,
data,
output_margin=output_margin,
pred_leaf=pred_leaf,
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=validate_features,
training=training,
iteration_range=iteration_range,
strict_shape=strict_shape,
)
class Booster(Model):
pass
if not xgboost:
XGBScikitLearnBase = None
else:
class XGBScikitLearnBase(xgboost.XGBModel):
"""
Base class for implementing scikit-learn interface
"""
def _set_model(
self, xgb_model: Union[xgboost.XGBModel, xgboost.Booster] = None
):
booster = None
if isinstance(xgb_model, xgboost.XGBModel):
booster = xgb_model.get_booster()
elif isinstance(xgb_model, xgboost.Booster):
booster = xgb_model
if booster is not None:
self._Booster = to_remote_model(booster, model_cls=Booster)
@classmethod
def _get_param_names(cls):
# make sure `xgb_model` not treated as a model param
names = super()._get_param_names()
if names:
names = [p for p in names if p != "xgb_model"]
return names
def __repr__(self):
local_model = self.fetch()
return repr(local_model)
def fit(
self,
X,
y,
sample_weights=None,
eval_set=None,
sample_weight_eval_set=None,
**kw,
):
"""
Fit the regressor. Note that fit() is an eager-execution
API. The call will be blocked until training finished.
Parameters
----------
X : array_like
Feature matrix
y : array_like
Labels
sample_weight : array_like
instance weights
eval_set : list, optional
A list of (X, y) tuple pairs to use as validation sets, for which
metrics will be computed.
Validation metrics will help us track the performance of the model.
sample_weight_eval_set : list, optional
A list of the form [L_1, L_2, ..., L_n], where each L_i is a list
of group weights on the i-th validation set.
"""
raise NotImplementedError
def predict(self, data, **kw):
"""
Predict with `data`.
Parameters
----------
data: data that can be used to perform prediction
Returns
-------
prediction : maxframe.tensor.Tensor
"""
raise NotImplementedError
def evals_result(self, **kw) -> Dict:
"""Return the evaluation results.
If **eval_set** is passed to the :py:meth:`fit` function, you can call
``evals_result()`` to get evaluation results for all passed **eval_sets**. When
**eval_metric** is also passed to the :py:meth:`fit` function, the
**evals_result** will contain the **eval_metrics** passed to the :py:meth:`fit`
function.
The returned evaluation result is a dictionary:
.. code-block:: python
{'validation_0': {'logloss': ['0.604835', '0.531479']},
'validation_1': {'logloss': ['0.41965', '0.17686']}}
Note that evals_result() will be blocked until the train is finished.
Returns
-------
evals_result
"""
result = super().evals_result()
if not self._Booster.op.has_evals_result or len(result) != 0:
return result
session = kw.pop("session", None)
run_kwargs = kw.pop("run_kwargs", dict())
self._Booster.execute(session=session, **run_kwargs)
return super().evals_result()
def execute(self, session=None, run_kwargs=None):
self._Booster.execute(session=session, run_kwargs=run_kwargs)
return self
def fetch(self, session=None, run_kwargs=None):
from xgboost import sklearn as xgb_sklearn
booster = self._Booster.fetch(session=session, run_kwargs=run_kwargs)
remote_bst, self._Booster = self._Booster, booster
try:
local_cls = getattr(xgb_sklearn, type(self).__name__)
local_model = local_cls(**self.get_params(deep=True))
local_model._Booster = booster
return local_model
finally:
self._Booster = remote_bst
@staticmethod
def _calc_feature_importance(bst, importance_type, n_features):
config = json.loads(bst.save_config())
bst_type = config["learner"]["gradient_booster"]["name"]
dft = "weight" if bst_type == "gblinear" else "gain"
importance_type = importance_type or dft
score = bst.get_score(importance_type=importance_type)
if bst.feature_names is None:
feature_names = [f"f{i}" for i in range(n_features)]
else:
feature_names = bst.feature_names
# gblinear returns all features so the `get` in next line is only for gbtree.
all_features = [score.get(f, 0.0) for f in feature_names]
all_features_arr = np.array(all_features, dtype=np.float32)
total = all_features_arr.sum()
if total == 0:
return all_features_arr
return all_features_arr / total
@property
def feature_importances_(self):
op = ModelApplyChunk(
func=self._calc_feature_importance, output_types=[OutputType.tensor]
)
params = {"shape": (self._n_features_in,), "dtype": np.dtype(np.float32)}
return op(
self.get_booster(),
[params],
importance_type=self.importance_type,
n_features=self._n_features_in,
)[0]
def wrap_evaluation_matrices(
missing: float,
X: Any,
y: Any,
sample_weight: Optional[Any],
base_margin: Optional[Any],
eval_set: Optional[List[Tuple[Any, Any]]],
sample_weight_eval_set: Optional[List[Any]],
base_margin_eval_set: Optional[List[Any]],
label_transform: Callable = lambda x: x,
) -> Tuple[Any, Optional[List[Tuple[Any, str]]]]:
"""
Convert array_like evaluation matrices into DMatrix.
Perform validation on the way.
"""
train_dmatrix = DMatrix(
data=X,
label=label_transform(y),
weight=sample_weight,
base_margin=base_margin,
missing=missing,
)
n_validation = 0 if eval_set is None else len(eval_set)
def validate_or_none(meta: Optional[List], name: str) -> List:
if meta is None:
return [None] * n_validation
if len(meta) != n_validation:
raise ValueError(
f"{name}'s length does not equal `eval_set`'s length, "
+ f"expecting {n_validation}, got {len(meta)}"
)
return meta
if eval_set is not None:
sample_weight_eval_set = validate_or_none(
sample_weight_eval_set, "sample_weight_eval_set"
)
base_margin_eval_set = validate_or_none(
base_margin_eval_set, "base_margin_eval_set"
)
evals = []
for i, (valid_X, valid_y) in enumerate(eval_set):
# Skip the duplicated entry.
if all(
(
valid_X is X,
valid_y is y,
sample_weight_eval_set[i] is sample_weight,
base_margin_eval_set[i] is base_margin,
)
):
evals.append(train_dmatrix)
else:
m = DMatrix(
data=valid_X,
label=label_transform(valid_y),
weight=sample_weight_eval_set[i],
base_margin=base_margin_eval_set[i],
missing=missing,
)
evals.append(m)
nevals = len(evals)
eval_names = [f"validation_{i}" for i in range(nevals)]
evals = list(zip(evals, eval_names))
else:
if any(
meta is not None
for meta in [
sample_weight_eval_set,
base_margin_eval_set,
]
):
raise ValueError(
"`eval_set` is not set but one of the other evaluation meta info is "
"not None."
)
evals = []
return train_dmatrix, evals