#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sat May 21 02:52:36 2022 @author: tanu """ # https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html import pandas as pd from sklearn.pipeline import Pipeline from sklearn.datasets import make_classification from sklearn.preprocessing import StandardScaler from sklearn.model_selection import GridSearchCV from sklearn.neighbors import KNeighborsClassifier from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.feature_selection import SelectKBest, mutual_info_classif #pd.options.plotting.backend = "plotly" X_eg, y_eg = make_classification(n_samples=1000, n_features=30, n_informative=5, n_redundant=5, n_classes=2, random_state=123) pipe = Pipeline([('scaler', StandardScaler()), ('selector', SelectKBest(mutual_info_classif, k=9)), ('classifier', LogisticRegression())]) search_space = [{'selector__k': [5, 6, 7, 10]}, {'classifier': [LogisticRegression()], 'classifier__C': [0.01,1.0], 'classifier__solver': ['saga', 'lbfgs']}, {'classifier': [RandomForestClassifier(n_estimators=100)], 'classifier__max_depth': [5, 10, None]}, {'classifier': [KNeighborsClassifier()], 'classifier__n_neighbors': [3, 7, 11], 'classifier__weights': ['uniform', 'distance']}] clf = GridSearchCV(pipe, search_space, cv=10, verbose=0) clf2 = clf.fit(X_eg, y_eg) clf2._check_feature_names clf2.best_estimator_.named_steps['selector'].n_features_in_ clf2.best_estimator_ #n of best features clf2.best_params_ clf2.best_estimator_.get_params clf2.get_feature_names( clf3 = clf2.best_estimator_ # clf3._final_estimator_ clf3._final_estimator.C clf3._final_estimator.solver fs_bmod = clf2.best_estimator_ print('\nbest model with feature selection:', fs_bmod) ######################################################### #cv = rskf_cv cv = skf_cv # my data: Feature Selelction + GridSearch CV + Pipeline pipe = Pipeline([ ('pre', MinMaxScaler()) # , ('fs', RFECV(LogisticRegression(**rs), cv = cv, scoring = 'matthews_corrcoef')) , ('fs', RFECV(DecisionTreeClassifier(**rs), cv = cv, scoring = 'matthews_corrcoef')) , ('clf', LogisticRegression(**rs))]) search_space = [ { 'fs__estimator': [LogisticRegression(**rs)] , 'fs__min_features_to_select': [0,1] ,'fs__cv': [rskf_cv] }, { #'clf': [LogisticRegression()], #'clf__C': np.logspace(0, 4, 10), 'clf__C': [1], 'clf__max_iter': [100], 'clf__penalty': ['l1', 'l2'], 'clf__solver': ['saga'] }, { #'clf': [LogisticRegression()], #'clf__C': np.logspace(0, 4, 10), 'clf__C': [2, 2.5], 'clf__max_iter': [100], 'clf__penalty': ['l1', 'l2'], 'clf__solver': ['saga'] }, #{'clf': [RandomForestclf(n_estimators=100)], # 'clf__max_depth': [5, 10, None]}, #{'clf': [KNeighborsclf()], # 'clf__n_neighbors': [3, 7, 11], # 'clf__weights': ['uniform', 'distance'] #} ] gscv_fs = GridSearchCV(pipe , search_space , cv = cv , scoring = mcc_score_fn , refit = 'mcc' , verbose = 1 , return_train_score = True , **njobs) gscv_fs.fit(X, y) #Fitting 10 folds for each of 8 candidates, totalling 80 fits # QUESTION: HOW?? gscv_fs.best_params_ gscv_fs.best_score_ # Training best score corresponds to the max of the mean_test train_bscore = round(gscv_fs.best_score_, 2); train_bscore print('\nTraining best score (MCC):', train_bscore) gscv_fs.cv_results_['mean_test_mcc'] round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2) check_train_score = [round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) , round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2)] check_train_score = np.nanmax(check_train_score) # Training results gscv_tr_resD = gscv_fs.cv_results_ mod_refit_param = gscv_fs.refit # sanity check if train_bscore == check_train_score: print('\nVerified training score (MCC):', train_bscore ) else: print('\nTraining score could not be internatlly verified. Please check training results dict') # Blind test: REAL check! tp = gscv_fs.predict(X_bts) print('\nMCC on Blind test:' , round(matthews_corrcoef(y_bts, tp),2)) print('\nAccuracy on Blind test:', round(accuracy_score(y_bts, tp),2)) ############ # info extraction ############ # gives input vals?? gscv_fs._check_n_features # gives gscv params used gscv_fs._get_param_names() # gives ?? gscv_fs.best_estimator_ gscv_fs.best_params_ # gives best estimator params as a dict gscv_fs.best_estimator_._final_estimator # similar to above, doesn't contain max_iter gscv_fs.best_estimator_.named_steps['fs'].get_support() gscv_fs.best_estimator_.named_steps['fs'].ranking_ # array of ranks for the features gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.mean() gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.max() #gscv_fs.best_estimator_.named_steps['fs'].grid_scores_ ############################################################################### #============ # FS results #============ # Now get the features out all_features = gscv_fs.feature_names_in_ n_all_features = gscv_fs.n_features_in_ #all_features = gsfit.feature_names_in_ sel_features = X.columns[gscv_fs.best_estimator_.named_steps['fs'].get_support()] n_sf = gscv_fs.best_estimator_.named_steps['fs'].n_features_ # get model name model_name = gscv_fs.best_estimator_.named_steps['clf'] b_model_params = gscv_fs.best_params_ print('\n========================================' , '\nRunning model:' , '\nModel name:', model_name , '\n===============================================' , '\nRunning feature selection with RFECV for model' , '\nTotal no. of features in model:', len(all_features) , '\nThese are:\n', all_features, '\n\n' , '\nNo of features for best model: ', n_sf , '\nThese are:', sel_features, '\n\n' , '\nBest Model hyperparams:', b_model_params ) ############################################################################### ############################## OUTPUT ######################################### ############################################################################### #========================= # Blind test: BTS results #========================= # Build the final results with all scores for a feature selected model bts_predict = gscv_fs.predict(X_bts) print('\nMCC on Blind test:' , round(matthews_corrcoef(y_bts, bts_predict),2)) print('\nAccuracy on Blind test:', round(accuracy_score(y_bts, bts_predict),2)) bts_mcc_score = round(matthews_corrcoef(y_bts, bts_predict),2) # Diff b/w train and bts test scores train_test_diff = train_bscore - bts_mcc_score print('\nDiff b/w train and blind test score (MCC):', train_test_diff) # create a dict with all scores lr_btsD = {#'best_model': list(gscv_lr_fit_be_mod.items()) #'bts_mcc':None 'bts_fscore':None , 'bts_precision':None , 'bts_recall':None , 'bts_accuracy':None , 'bts_roc_auc':None , 'bts_jaccard':None} lr_btsD #lr_btsD['bts_mcc'] = bts_mcc_score lr_btsD['bts_fscore'] = round(f1_score(y_bts, bts_predict),2) lr_btsD['bts_precision'] = round(precision_score(y_bts, bts_predict),2) lr_btsD['bts_recall'] = round(recall_score(y_bts, bts_predict),2) lr_btsD['bts_accuracy'] = round(accuracy_score(y_bts, bts_predict),2) lr_btsD['bts_roc_auc'] = round(roc_auc_score(y_bts, bts_predict),2) lr_btsD['bts_jaccard'] = round(jaccard_score(y_bts, bts_predict),2) lr_btsD #=========================== # Add FS related model info #=========================== model_namef = str(model_name) # FIXME: doesn't tell you which it has chosen fs_methodf = str(gscv_fs.best_estimator_.named_steps['fs']) all_featuresL = list(all_features) fs_res_arrayf = str(list( gscv_fs.best_estimator_.named_steps['fs'].get_support())) fs_res_array_rankf = list( gscv_fs.best_estimator_.named_steps['fs'].ranking_) sel_featuresf = list(sel_features) n_sf = int(n_sf) output_modelD = {'model_name': model_namef , 'model_refit_param': mod_refit_param , 'Best_model_params': b_model_params , 'n_all_features': n_all_features , 'fs_method': fs_methodf , 'fs_res_array': fs_res_arrayf , 'fs_res_array_rank': fs_res_array_rankf , 'all_feature_names': all_featuresL , 'n_sel_features': n_sf , 'sel_features_names': sel_featuresf} output_modelD #======================================== # Update output_modelD with bts_results #======================================== output_modelD.update(lr_btsD) output_modelD output_modelD['train_score (MCC)'] = train_bscore output_modelD['bts_mcc'] = bts_mcc_score output_modelD['train_bts_diff'] = round(train_test_diff,2) output_modelD class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj) json.dumps(output_modelD, cls=NpEncoder) #======================================== # Write final output file # https://stackoverflow.com/questions/19201290/how-to-save-a-dictionary-to-a-file #======================================== #output final dict as a json outFile = 'LR_FS.json' with open(outFile, 'w') as f: f.write(json.dumps(output_modelD,cls=NpEncoder)) # read json file = 'LR_FS.json' with open(file, 'r') as f: data = json.load(f) ##############################################################################