in text/src/autogluon/text/text_prediction/mx/models.py [0:0]
def train(self, train_data, tuning_data,
num_cpus=None,
num_gpus=None,
time_limit=None,
tune_kwargs=None,
search_space=None,
continue_training=False,
plot_results=False,
console_log=True,
seed=None,
verbosity=2):
"""The train function.
Parameters
----------
train_data
The training data
tuning_data
The tuning data
num_cpus
Number of CPUs for each trial
num_gpus
Number of GPUs for each trial
time_limit
The time limits
tune_kwargs
Parameters of the HPO algorithms. For example, the scheduling
algorithm, scheduling backend, HPO algorithm.
search_space
The search space options
continue_training
Whether we are loading a new model from scratch or we are continune model training
plot_results
Whether to plot results or not
console_log
Whether to log into the console
seed
The seed
verbosity
Verbosity
"""
set_seed(seed)
set_logger_verbosity(verbosity)
start_tick = time.time()
assert len(self._label_columns) == 1, 'Currently, we only support single label.'
# TODO(sxjscience) Try to support S3
os.makedirs(self._output_directory, exist_ok=True)
if search_space is None:
search_space = \
ag_text_presets.create('default')['models']['MultimodalTextModel']['search_space']
# Scheduler and searcher for HPO
if tune_kwargs is None:
tune_kwargs = ag_text_presets.create('default')['tune_kwargs']
scheduler_options = tune_kwargs['scheduler_options']
num_cpus, num_gpus = get_recommended_resource(num_cpus, num_gpus)
if num_gpus == 0:
if 'AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU' in os.environ:
use_warning = int(os.environ['AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU'])
else:
use_warning = False
if use_warning:
warnings.warn('No GPU is detected in the machine and we will recommend you to '
'use TextPredictor on a GPU-enabled instance. Currently, '
'training on CPU is slow.')
else:
raise RuntimeError('No GPU is detected in the machine and we will '
'not proceed to run TextPredictor because they will train '
'too slowly with only CPU. You may try to set `ngpus_per_trial` '
'to a number larger than 0 when calling `.fit()`. '
'Also, you can set the environment variable '
'"AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU=1" to force the model to '
'use CPU for training.')
logger.info(f"The GluonNLP V0 backend is used. "
f"We will use {num_cpus} cpus and "
f"{num_gpus} gpus to train each trial.")
if scheduler_options is None:
scheduler_options = dict()
if plot_results is None:
if in_ipynb():
plot_results = True
else:
plot_results = False
scheduler_options = compile_scheduler_options_v2(
scheduler_options=scheduler_options,
scheduler=tune_kwargs['search_strategy'],
search_strategy=tune_kwargs['searcher'],
search_options=tune_kwargs['search_options'],
nthreads_per_trial=num_cpus,
ngpus_per_trial=num_gpus,
checkpoint=os.path.join(self._output_directory, 'checkpoint.ag'),
num_trials=tune_kwargs['num_trials'],
time_out=time_limit,
resume=False,
visualizer=scheduler_options.get('visualizer'),
time_attr='report_idx',
reward_attr='reward_attr',
dist_ip_addrs=scheduler_options.get('dist_ip_addrs'))
# Create a temporary cache file. The internal train function will load the
# temporary cache.
# In fact, we may generalize this functionality to create the cache in S3/FSx.
cache_path = os.path.join(self._output_directory, f'cache_{uuid.uuid4()}')
os.makedirs(cache_path)
train_df_path = os.path.join(cache_path, 'cache_train_dataframe.pd.pkl')
tuning_df_path = os.path.join(cache_path, 'cache_tuning_dataframe.pd.pkl')
train_data.to_pickle(train_df_path)
tuning_data.to_pickle(tuning_df_path)
if continue_training:
# We need to store the current weights to the local disk as temporary cache.
params_path = os.path.join(cache_path, 'old_net.params')
preprocessor_path = os.path.join(cache_path, 'preprocessor.pkl')
with open(preprocessor_path, 'wb') as of:
pickle.dump(self.preprocessor, of)
self.net.save_parameters(params_path)
else:
params_path = None
preprocessor_path = None
train_fn_kwargs = dict(
train_df_path=train_df_path,
time_limit=time_limit,
time_start=start_tick,
tuning_df_path=tuning_df_path,
base_config=self.base_config,
problem_type=self.problem_type,
column_types=self._column_types,
feature_columns=self._feature_columns,
label_column=self._label_columns[0],
log_metrics=self._log_metrics,
eval_metric=self._eval_metric,
output_directory=self._output_directory,
ngpus_per_trial=scheduler_options['resource']['num_gpus'],
params_path=params_path,
preprocessor_path=preprocessor_path,
continue_training=continue_training,
console_log=console_log,
verbosity=verbosity,
)
no_job_finished_err_msg =\
'No training job has been completed! '\
'There are two possibilities: '\
'1) The time_limit is too small, '\
'or 2) There are some internal errors in AutoGluon. '\
'For the first case, you can increase the time_limit or set it to '\
'None, e.g., setting "predictor.fit(..., time_limit=None). To '\
'further investigate the root cause, you can also try to set the '\
'"verbosity=3" and try again, i.e., predictor.set_verbosity(3).'
if scheduler_options['num_trials'] == 1:
reporter = FakeReporter()
rand_config = LocalRandomSearcher(search_space=search_space).get_config()
cur_config = {**search_space}
cur_config.update(rand_config)
results = train_function({**cur_config}, reporter=reporter, **train_fn_kwargs)
best_model_saved_dir_path = os.path.join(self._output_directory, 'task0')
cfg_path = os.path.join(best_model_saved_dir_path, 'cfg.yml')
# Check whether the job has finished
if not os.path.exists(cfg_path)\
or not os.path.exists(os.path.join(best_model_saved_dir_path, 'best_model.params')):
raise RuntimeError(no_job_finished_err_msg)
cfg = self.base_config.clone_merge(cfg_path)
local_results = pd.read_json(os.path.join(self._output_directory, 'task0',
'results_local.jsonl'), lines=True)
if plot_results:
plot_training_curves = os.path.join(self._output_directory,
'plot_training_curves.png')
import matplotlib.pyplot as plt
plt.ylabel(self._eval_metric)
plt.xlabel('report_idx')
plt.title("Performance vs Training-Time")
plt.plot(local_results['report_idx'].iloc[:-1],
local_results[local_results['eval_metric'][0]].iloc[:-1], label=f'task0')
plt.legend(loc='best')
plt.savefig(plot_training_curves)
plt.show()
self._results = local_results
else:
scheduler_cls, scheduler_params = scheduler_factory(scheduler_options)
# Create scheduler, run HPO experiment
scheduler = scheduler_cls(train_function, search_space=search_space, train_fn_kwargs=train_fn_kwargs, **scheduler_params)
scheduler.run()
scheduler.join_jobs()
if len(scheduler.config_history) == 0:
raise RuntimeError(no_job_finished_err_msg)
best_config = scheduler.get_best_config()
logger.info('Results=', scheduler.searcher._results)
logger.info('Best_config={}'.format(best_config))
best_task_id = scheduler.get_best_task_id()
best_model_saved_dir_path = os.path.join(self._output_directory,
'task{}'.format(best_task_id))
best_cfg_path = os.path.join(best_model_saved_dir_path, 'cfg.yml')
cfg = self.base_config.clone_merge(best_cfg_path)
if plot_results:
plot_training_curves = os.path.join(self._output_directory,
'plot_training_curves.png')
scheduler.get_training_curves(filename=plot_training_curves,
plot=plot_results,
use_legend=True)
self._results = dict()
self._results.update(best_reward=scheduler.get_best_reward(),
best_config=scheduler.get_best_config(),
total_time=time.time() - start_tick,
metadata=scheduler.metadata,
training_history=scheduler.training_history,
config_history=scheduler.config_history,
reward_attr=scheduler._reward_attr,
config=cfg)
# Consider to move this to a separate predictor
self._config = cfg
# Average parameters
if cfg.model.use_avg_nbest:
nbest_path_l = []
for best_id in range(cfg.optimization.nbest):
nbest_path = os.path.join(best_model_saved_dir_path, f'nbest_model{best_id}.params')
if os.path.exists(nbest_path):
nbest_path_l.append(nbest_path)
avg_nbest_path = os.path.join(best_model_saved_dir_path, 'nbest_model_avg.params')
average_checkpoints(nbest_path_l, avg_nbest_path)
with open(os.path.join(best_model_saved_dir_path, 'preprocessor.pkl'), 'rb') as in_f:
self._preprocessor = pickle.load(in_f)
backbone_model_cls, backbone_cfg, tokenizer, backbone_params_path, _ \
= get_backbone(cfg.model.backbone.name)
if 'roberta' in cfg.model.backbone.name:
text_backbone = backbone_model_cls.from_cfg(backbone_cfg, return_all_hiddens=True)
else:
text_backbone = backbone_model_cls.from_cfg(backbone_cfg)
if self._problem_type == REGRESSION:
out_shape = 1
elif self._problem_type == MULTICLASS:
out_shape = len(self._preprocessor.label_generator.classes_)
elif self._problem_type == BINARY:
assert len(self._preprocessor.label_generator.classes_) == 2
out_shape = 2
else:
raise NotImplementedError
net = MultiModalWithPretrainedTextNN(
text_backbone=text_backbone,
num_text_features=1,
num_categorical_features=len(self._preprocessor.categorical_feature_names),
num_numerical_features=len(self._preprocessor.numerical_feature_names) > 0,
numerical_input_units=None if len(self._preprocessor.numerical_feature_names) == 0 else len(
self._preprocessor.numerical_feature_names),
num_categories=self._preprocessor.categorical_num_categories,
get_embedding=False,
cfg=cfg.model.network,
out_shape=out_shape)
net.hybridize()
if cfg.model.use_avg_nbest:
net.load_parameters(avg_nbest_path, ctx=mx.cpu())
else:
net.load_parameters(os.path.join(best_model_saved_dir_path, 'best_model.params'),
ctx=mx.cpu())
self._net = net
mx.npx.waitall()
# Clean cache, will directly raise
shutil.rmtree(cache_path)
# Clean up the temporary workspace that stores the configuration/weights of the best model
try:
shutil.rmtree(best_model_saved_dir_path)
except OSError as e:
logger.info(f'Failed to remove the temporary best model directory: "{best_model_saved_dir_path}".')