in src/mlmax/monitoring.py [0:0]
def main(args):
"""
All profiling output artifacts will be placed at /opt/ml/processing/profiling.
Tree:
/opt/ml/processing/profiling/baseline/
/opt/ml/processing/profiling/inference/
"""
# Create directory
data_dir = Path(args.data_dir)
(data_dir / "profiling/baseline").mkdir(exist_ok=True, parents=True)
(data_dir / "profiling/inference").mkdir(exist_ok=True, parents=True)
if args.mode == "train":
# TODO: if there data has been in proper format, there is no processing
# required.
infer_data_path = os.path.join(args.data_dir, args.train_input)
df = read_data(infer_data_path)
X_train, X_test, y_train, y_test = split_data(df, args)
# Save baseline data for future reference
write_dataframe(
X_train, args, "profiling/baseline/train_features_baseline.csv", header=True
)
write_dataframe(
y_train, args, "profiling/baseline/train_labels_baseline.csv", header=True
)
write_dataframe(
X_test, args, "profiling/baseline/test_features.csv", header=True
)
write_dataframe(y_test, args, "profiling/baseline/test_labels.csv", header=True)
# Calculate baseline metrics based on training data
result = generate_statistic(X_train, y_train, args)
write_json(result, args, "profiling/baseline/baseline_statistic.json")
# Calculate PSI for baseline data (train vs test set).
# Expected to have good score.
test_result = generate_psi(X_train, X_test, args)
write_json(test_result, args, "profiling/baseline/baseline_psi.json")
if args.mode == "infer":
# Read baseline training features
baseline_data_path = os.path.join(args.data_dir, args.infer_baseline)
X_train = pd.read_csv(baseline_data_path)
# Read new inference features
infer_data_path = os.path.join(args.data_dir, args.infer_input)
X_infer = read_data(infer_data_path)
X_infer = X_infer.drop(["income"], axis=1)
# Calculate PSI for inference vs baseline data
test_result = generate_psi(X_train, X_infer, args)
write_json(test_result, args, "profiling/inference/infer_psi.json")