aiops/Pathformer_ICLR2024/utils/tools.py (93 lines of code) (raw):
import numpy as np
import torch
import matplotlib.pyplot as plt
plt.switch_backend('agg')
def adjust_learning_rate(optimizer,scheduler, epoch, args, printout=True):
# lr = args.learning_rate * (0.2 ** (epoch // 2))
if args.lradj == 'type1':
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 1))}
elif args.lradj == 'type2':
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 2))}
elif args.lradj == 'type3':
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 3))}
elif args.lradj == 'type4':
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 4))}
elif args.lradj == 'type5':
lr_adjust = {
2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
10: 5e-7, 15: 1e-7, 20: 5e-8
}
elif args.lradj == 'constant':
lr_adjust = {epoch: args.learning_rate}
elif args.lradj == '3':
lr_adjust = {epoch: args.learning_rate if epoch < 10 else args.learning_rate*0.1}
elif args.lradj == '4':
lr_adjust = {epoch: args.learning_rate if epoch < 15 else args.learning_rate*0.1}
elif args.lradj == '5':
lr_adjust = {epoch: args.learning_rate if epoch < 25 else args.learning_rate*0.1}
elif args.lradj == '6':
lr_adjust = {epoch: args.learning_rate if epoch < 5 else args.learning_rate*0.1}
elif args.lradj == 'TST':
lr_adjust = {epoch: scheduler.get_last_lr()[0]}
if epoch in lr_adjust.keys():
lr = lr_adjust[epoch]
for param_group in optimizer.param_groups:
param_group['lr'] = lr
if printout: print('Updating learning rate to {}'.format(lr))
class EarlyStopping:
def __init__(self, patience=7, verbose=False, delta=0):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
def __call__(self, val_loss, model, path):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
self.counter = 0
def save_checkpoint(self, val_loss, model, path):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model.state_dict(), path + '/' + 'checkpoint.pth')
self.val_loss_min = val_loss
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class StandardScaler():
def __init__(self, mean, std):
self.mean = mean
self.std = std
def transform(self, data):
return (data - self.mean) / self.std
def inverse_transform(self, data):
return (data * self.std) + self.mean
def visual(true, preds=None, name='./pic/test.pdf'):
"""
Results visualization
"""
plt.style.use('ggplot')
plt.figure()
plt.plot(true, label='GroundTruth', linewidth=2)
if preds is not None:
plt.plot(preds, label='Prediction', linewidth=2)
plt.legend(loc="upper right")
plt.savefig(name,bbox_inches='tight')
def test_params_flop(model,x_shape):
"""
If you want to thest former's flop, you need to give default value to inputs in model.forward(), the following code can only pass one argument to forward()
"""
model_params = 0
for parameter in model.parameters():
model_params += parameter.numel()
print('INFO: Trainable parameter count: {:.2f}M'.format(model_params / 1000000.0))
from ptflops import get_model_complexity_info
with torch.cuda.device(0):
macs, params = get_model_complexity_info(model.cuda(), x_shape, as_strings=True, print_per_layer_stat=True)
# print('Flops:' + flops)
# print('Params:' + params)
print('{:<30} {:<8}'.format('Computational complexity: ', macs))
print('{:<30} {:<8}'.format('Number of parameters: ', params))