tzrec/tools/feature_selection.py (235 lines of code) (raw):
# Copyright (c) 2024, Alibaba Group;
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
from collections import OrderedDict
from typing import Dict, Optional
try:
# pyre-ignore [21]
import matplotlib.pyplot as plt
except Exception:
print("please install matplotlib !!!")
import pandas as pd
from torch.distributed.checkpoint import load
from tzrec.main import _create_features, _create_model, init_process_group
from tzrec.models.model import ScriptWrapper
from tzrec.protos.model_pb2 import FeatureGroupType
from tzrec.utils import checkpoint_util, config_util
from tzrec.utils.logging_util import logger
class VariationalDropoutFS:
"""Feature selection for group variational dropout.
Args:
config_path (str): feature selection model config path.
model_dir (str): feature selection model checkpoint path.
output_dir (str): feature selection result directory.
topk (int): select topk importance features for each feature group.
fg_path (str): fg config path.
visualize (boolean): visualization feature selection result or not.
"""
def __init__(
self,
config_path: str,
model_dir: str,
output_dir: str,
topk: int,
fg_path: Optional[str] = None,
visualize: Optional[bool] = False,
clear_variational_dropout: bool = True,
) -> None:
self._config_path = config_path
self._model_dir = model_dir
self._output_dir = output_dir
self._topk = topk
self._fg_path = fg_path
self._visualize = visualize
self._clear_variational_dropout = clear_variational_dropout
def _feature_dim_dropout_ratio(self) -> Dict[str, Dict[str, float]]:
"""Get dropout ratio of feature_groups feature-wise."""
pipeline_config = config_util.load_pipeline_config(self._config_path)
data_config = pipeline_config.data_config
# Build feature
features = _create_features(list(pipeline_config.feature_configs), data_config)
model = _create_model(
pipeline_config.model_config,
features,
list(data_config.label_fields),
)
model = ScriptWrapper(model)
checkpoint_path, _ = checkpoint_util.latest_checkpoint(self._model_dir)
if checkpoint_path:
model_ckpt_path = os.path.join(checkpoint_path, "model")
logger.info(
f"Restoring model feature dropout ratio from {model_ckpt_path}..."
)
state_dict = model.state_dict()
new_state_dict = {
k: v for k, v in state_dict.items() if "group_variational_dropouts" in k
}
load(
new_state_dict,
checkpoint_id=model_ckpt_path,
)
else:
raise ValueError("checkpoint path should be specified.")
group_feature_importance = {}
for name, sub_model in model.named_modules():
if "group_variational_dropouts" == name.split(".")[-1]:
for variational_dropout in sub_model.values():
group_name = variational_dropout.group_name
values = variational_dropout.feature_p.sigmoid().tolist()
feature_names = variational_dropout.features_dimension.keys()
group_feature_p = {
feature_name: dropout
for feature_name, dropout in zip(feature_names, values)
}
group_feature_importance[group_name] = group_feature_p
if len(group_feature_importance) == 0:
raise ValueError(
"you not configure variational dropout "
"or no group can be variational dropout."
)
return group_feature_importance
def _dump_to_csv(
self, feature_importance: Dict[str, float], group_name: str
) -> None:
"""Dump feature importance data to a csv file."""
csv_path = os.path.join(
self._output_dir, "feature_dropout_ratio_%s.csv" % group_name
)
df = pd.DataFrame(
# pyre-fixme [6]
columns=["feature_name", "mean_drop_p"],
data=[list(kv) for kv in feature_importance.items()],
)
df.to_csv(csv_path, index=None)
def _visualize_feature_importance(
self, feature_importance: Dict[str, float], group_name: str
) -> None:
"""Draw feature importance histogram."""
df = pd.DataFrame(
# pyre-fixme [6]
columns=["feature_name", "mean_drop_p"],
data=[list(kv) for kv in feature_importance.items()],
)
df["color"] = ["red" if x < 0.5 else "green" for x in df["mean_drop_p"]]
df.sort_values("mean_drop_p", inplace=True, ascending=False)
df.reset_index(inplace=True)
# Draw plot
plt.figure(figsize=(90, 200), dpi=100)
plt.hlines(y=df.index, xmin=0, xmax=df.mean_drop_p)
for x, y, tex in zip(df.mean_drop_p, df.index, df.mean_drop_p):
plt.text(
x,
y,
round(tex, 2),
horizontalalignment="right" if x < 0 else "left",
verticalalignment="center",
fontdict={"color": "red" if x < 0 else "green", "size": 14},
)
# Decorations
plt.yticks(df.index, df.feature_name, fontsize=20)
plt.title("Dropout Ratio", fontdict={"size": 30})
plt.grid(linestyle="--", alpha=0.5)
plt.xlim(0, 1)
png = os.path.join(self._output_dir, "feature_dropout_pic_%s.png" % group_name)
plt.savefig(png, format="png")
def _process_config(
self, feature_importance_map: Dict[str, Dict[str, float]]
) -> None:
"""Process model config with feature selection."""
config = config_util.load_pipeline_config(self._config_path)
useful_features = set()
for feature_importance in feature_importance_map.values():
for i, (feature_name, _) in enumerate(feature_importance.items()):
if i < self._topk:
useful_features.add(feature_name)
for feature_group in config.model_config.feature_groups:
group_type = feature_group.group_type
if group_type == FeatureGroupType.SEQUENCE:
for feature_name in feature_group.feature_names:
useful_features.add(feature_name)
else:
for sequence_groups in feature_group.sequence_groups:
for feature_name in sequence_groups.feature_names:
useful_features.add(feature_name)
feature_configs = []
for feature_config in config.feature_configs:
feat_type = feature_config.WhichOneof("feature")
if feat_type == "sequence_feature":
feature_configs.append(feature_config)
else:
feature = getattr(feature_config, feat_type)
feature_name = feature.feature_name
if feature_name in useful_features:
feature_configs.append(feature_config)
config.ClearField("feature_configs")
config.feature_configs.extend(feature_configs)
for feature_group in config.model_config.feature_groups:
group_name = feature_group.group_name
if group_name in feature_importance_map:
feature_importance = feature_importance_map[group_name]
group_useful_features = set()
for i, (feature_name, _) in enumerate(feature_importance.items()):
if i < self._topk:
group_useful_features.add(feature_name)
feature_names = []
for feature_name in feature_group.feature_names:
if feature_name in group_useful_features:
feature_names.append(feature_name)
feature_group.ClearField("feature_names")
feature_group.feature_names.extend(feature_names)
if self._clear_variational_dropout:
config.model_config.ClearField("variational_dropout")
config_util.save_message(
config, os.path.join(self._output_dir, os.path.basename(self._config_path))
)
def process(self) -> None:
"""Feature selection process method."""
logger.info("Loading logit_p of VariationalDropout layer ...")
is_rank_zero = int(os.environ.get("RANK", 0)) == 0
device, _ = init_process_group()
if is_rank_zero:
if not os.path.exists(self._output_dir):
os.makedirs(self._output_dir)
group_feature_importance = self._feature_dim_dropout_ratio()
feature_importance_map = {}
for group_name, feature_dim_dropout_p in group_feature_importance.items():
feature_importance = OrderedDict(
sorted(feature_dim_dropout_p.items(), key=lambda e: e[1])
)
feature_importance_map[group_name] = feature_importance
logger.info("Dump %s feature importance to csv ..." % group_name)
self._dump_to_csv(feature_importance, group_name)
if self._visualize:
logger.info("Visualizing %s feature importance ..." % group_name)
self._visualize_feature_importance(feature_importance, group_name)
logger.info("Processing model config ...")
self._process_config(feature_importance_map)
logger.info("feature selection complete ...")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--pipeline_config_path",
type=str,
default=None,
help="Path to pipeline config file.",
)
parser.add_argument(
"--model_dir",
type=str,
default=None,
help="checkpoint to be evaled, if not specified, use the latest checkpoint in "
"train_config.model_dir.",
)
parser.add_argument(
"--output_dir",
type=str,
default=None,
help="feature selection out put directory",
)
parser.add_argument(
"--topk",
type=int,
default=100,
help="select topk importance features for each feature group",
)
parser.add_argument(
"--visualize",
action="store_true",
default=False,
help="visualization feature selection result or not",
)
parser.add_argument(
"--clear_variational_dropout",
action="store_true",
default=False,
help="clear variational dropout config or not",
)
args, extra_args = parser.parse_known_args()
fs = VariationalDropoutFS(
args.pipeline_config_path,
args.model_dir,
args.output_dir,
args.topk,
visualize=args.visualize,
clear_variational_dropout=args.clear_variational_dropout,
)
fs.process()