ML_AI_training/uq_ml_models_FS/scriptfsycm.py

210 lines
9.6 KiB
Python

import pandas as pd
import numpy as np
import scipy as sp
import time
import sys
import os
import re
import argparse
from math import sqrt
from scipy import stats
import joblib
# Alogorithm
from xgboost.sklearn import XGBClassifier
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPRegressor
from sklearn.utils import all_estimators
# Pre-processing
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import train_test_split, cross_validate, cross_val_score, LeaveOneOut, KFold, RepeatedKFold, cross_val_predict
# Metric
from sklearn.metrics import mean_squared_error, make_scorer, roc_auc_score, f1_score, matthews_corrcoef, accuracy_score, balanced_accuracy_score, confusion_matrix, classification_report
# other vars
rs = {'random_state': 42}
njobs = {'n_jobs': 10}
scoring_fn = ({'accuracy' : make_scorer(accuracy_score)
, 'fscore' : make_scorer(f1_score)
, 'mcc' : make_scorer(matthews_corrcoef)
, 'precision' : make_scorer(precision_score)
, 'recall' : make_scorer(recall_score)
, 'roc_auc' : make_scorer(roc_auc_score)
, 'jcc' : make_scorer(jaccard_score)
})
skf_cv = StratifiedKFold(n_splits = 10
#, shuffle = False, random_state= None)
, shuffle = True,**rs)
rskf_cv = RepeatedStratifiedKFold(n_splits = 10
, n_repeats = 3
, **rs)
mcc_score_fn = {'mcc': make_scorer(matthews_corrcoef)}
jacc_score_fn = {'jcc': make_scorer(jaccard_score)}
#%% YC
#def run_all_ML(input_pd, target_label, bts_input, bts_target, var_type):
def run_all_ML(input_pd, target_label, blind_test_input_df, blind_test_target, preprocess = True, var_type = 'numerical'):
#y = input_pd[target_label]
#X = input_pd.drop(target_label,axis=1)
y = target_label
X = input_pd
# Determine categorical and numerical features
numerical_ix = input_pd.select_dtypes(include=['int64', 'float64']).columns
numerical_ix
categorical_ix = input_pd.select_dtypes(include=['object', 'bool']).columns
categorical_ix
# Determine preprocessing steps ~ var_type
if var_type == 'numerical':
t = [('num', MinMaxScaler(), numerical_ix)]
if var_type == 'categorical':
t = [('cat', OneHotEncoder(), categorical_ix)]
if var_type == 'mixed':
t = [('num', MinMaxScaler(), numerical_ix)
, ('cat', OneHotEncoder(), categorical_ix)]
col_transform = ColumnTransformer(transformers = t
, remainder='passthrough')
result_pd = pd.DataFrame()
result_bts_pd = pd.DataFrame()
#results_btsD = {}
results_all = {}
for name, algorithm in all_estimators(type_filter="classifier"):
try:
estmator = algorithm()
temp_pd = pd.DataFrame()
temp_cm = pd.DataFrame()
# # orig
# pipe = Pipeline([
# ("model" , algorithm())
# ])
# turn on and off preprocessing
if preprocess == True:
pipe = Pipeline([
('prep' , col_transform),
("model" , algorithm())
])
else:
pipe = Pipeline([
("model" , algorithm())
])
# cross val scores
y_pred = cross_val_predict(pipe, X, y, cv = 10, **njobs)
# CHANGE to cross_validate: ONLY THEN CAN YOU TRUST
# y_pred = cross_validate(pipe, X, y
# , cv = 10
# , scoring = scoring_fn
# , **njobs)
_mcc = round(matthews_corrcoef(y_pred, y), 3)
_bacc = round(balanced_accuracy_score(y_pred, y), 3)
_f1 = round(f1_score(y_pred, y), 3)
_roc_auc = round(roc_auc_score(y_pred, y), 3)
_tn, _fp, _fn, _tp = confusion_matrix(y_pred, y).ravel()
result_pd = result_pd.append(pd.DataFrame(np.column_stack([name, _tp, _tn, _fp, _fn, _roc_auc, _mcc, _bacc, _f1]),\
columns=['estimator', 'TP', 'TN', 'FP', 'FN',
'roc_auc', 'matthew', 'bacc', 'f1']),\
ignore_index=True)
#=========================
# Blind test: BTS results
#=========================
#Build the final results with all scores for a feature selected model
pipe.fit(input_pd, target_label)
bts_predict = pipe.predict(blind_test_input_df)
bts_mcc_score = round(matthews_corrcoef(blind_test_target, bts_predict),2)
print('\nMCC on Blind test:' , bts_mcc_score)
#print('\nAccuracy on Blind test:', round(accuracy_score(blind_test_target, bts_predict),2))
_mccBTS = round(matthews_corrcoef(bts_predict, blind_test_target), 3)
_baccBTS = round(balanced_accuracy_score(bts_predict, blind_test_target), 3)
_f1BTS = round(f1_score(bts_predict, blind_test_target), 3)
_roc_aucBTS = round(roc_auc_score(bts_predict, blind_test_target), 3)
_tnBTS, _fpBTS, _fnBTS, _tpBTS = confusion_matrix(bts_predict, blind_test_target).ravel()
result_bts_pd = result_bts_pd.append(pd.DataFrame(np.column_stack([name
, _tpBTS, _tnBTS
, _fpBTS, _fnBTS
, _roc_aucBTS
, _mccBTS
, _baccBTS, _f1BTS]),\
columns=['estimator', 'TP', 'TN', 'FP', 'FN',
'roc_auc', 'matthew', 'bacc', 'f1']),\
ignore_index=True)
results_all['CrossValResultsDF'] = result_pd
results_all['BlindTestResultsDF'] = result_bts_pd
except Exception as e:
print("XXXGot an error while running {}".format(name))
print(e)
#return(result_pd)
return(results_all)
#%% CALL function
#run_all_ML(input_pd=X, target_label=y, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
YC_resD2 = run_all_ML(input_pd=X, target_label=y, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
YC_resD_ros = run_all_ML(input_pd=X_ros, target_label=y_ros, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
CVResultsDF = YC_resD2['CrossValResultsDF']
CVResultsDF.sort_values(by=['matthew'], ascending=False, inplace=True)
BTSResultsDF = YC_resD2['BlindTestResultsDF']
BTSResultsDF.sort_values(by=['matthew'], ascending=False, inplace=True)
# from sklearn.utils import all_estimators
# for name, algorithm in all_estimators(type_filter="classifier"):
# clf = algorithm()
# print('Name:', name, '\nAlgo:', clf)
# Random Oversampling
YC_resD_ros = run_all_ML(input_pd=X_ros, target_label=y_ros, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
CVResultsDF_ros = YC_resD_ros['CrossValResultsDF']
CVResultsDF_ros.sort_values(by=['matthew'], ascending=False, inplace=True)
BTSResultsDF_ros = YC_resD_ros['BlindTestResultsDF']
BTSResultsDF_ros.sort_values(by=['matthew'], ascending=False, inplace=True)
# Random Undersampling
YC_resD_rus = run_all_ML(input_pd=X_rus, target_label=y_rus, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
CVResultsDF_rus = YC_resD_rus['CrossValResultsDF']
CVResultsDF_rus.sort_values(by=['matthew'], ascending=False, inplace=True)
BTSResultsDF_rus = YC_resD_rus['BlindTestResultsDF']
BTSResultsDF_rus.sort_values(by=['matthew'], ascending=False, inplace=True)
# Random Oversampling+Undersampling
YC_resD_rouC = run_all_ML(input_pd=X_rouC, target_label=y_rouC, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
CVResultsDF_rouC = YC_resD_rouC['CrossValResultsDF']
CVResultsDF_rouC.sort_values(by=['matthew'], ascending=False, inplace=True)
BTSResultsDF_rouC = YC_resD_rouC['BlindTestResultsDF']
BTSResultsDF_rouC.sort_values(by=['matthew'], ascending=False, inplace=True)
# SMOTE NC
YC_resD_smnc = run_all_ML(input_pd=X_smnc, target_label=y_smnc, blind_test_input_df=X_bts, blind_test_target=y_bts, preprocess = True, var_type = 'mixed')
CVResultsDF_smnc = YC_resD_smnc['CrossValResultsDF']
CVResultsDF_smnc.sort_values(by=['matthew'], ascending=False, inplace=True)
BTSResultsDF_smnc = YC_resD_smnc['BlindTestResultsDF']
BTSResultsDF_smnc.sort_values(by=['matthew'], ascending=False, inplace=True)