#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon May 23 23:25:26 2022 @author: tanu """ ##################################### def fsgs(input_df , target , blind_test_df = pd.DataFrame() , blind_test_target = pd.Series(dtype = 'int64') #, y_trueS = pd.Series() , estimator = LogisticRegression(**rs) , param_gridLd = {} , cv_method = StratifiedKFold(n_splits = 10 , shuffle = True,**rs) , var_type = ['numerical' , 'categorical' , 'mixed'] #, fs_estimator = [LogisticRegression(**rs)] , fs = RFECV(DecisionTreeClassifier(**rs) , cv = StratifiedKFold(n_splits = 10 , shuffle = True,**rs) , scoring = 'matthews_corrcoef') ): ''' returns Dict containing results from FS and hyperparam tuning for a given estiamtor >>> ADD MORE <<< optimised/selected based on mcc ''' # Determine categorical and numerical features numerical_ix = input_df.select_dtypes(include=['int64', 'float64']).columns numerical_ix categorical_ix = input_df.select_dtypes(include=['object', 'bool']).columns categorical_ix # Determine preprocessing steps ~ var_type if var_type == 'numerical': t = [('num', MinMaxScaler(), numerical_ix)] if var_type == 'categorical': t = [('cat', OneHotEncoder(), categorical_ix)] if var_type == 'mixed': t = [('cat', OneHotEncoder(), categorical_ix) , ('num', MinMaxScaler(), numerical_ix)] col_transform = ColumnTransformer(transformers = t , remainder='passthrough') ########################################################################### #================= # Create var_type ~ column names # using one hot encoder with RFECV means the names internally are lost # Hence fit col_transformeer to my input_df and get all the column names # out and stored in a var to allow the 'selected features' to be subsetted # from the numpy boolean array #================= col_transform.fit(input_df) col_transform.get_feature_names_out() var_type_colnames = col_transform.get_feature_names_out() var_type_colnames = pd.Index(var_type_colnames) if var_type == 'mixed': print('\nVariable type is:', var_type , '\nNo. of columns in input_df:', len(input_df.columns) , '\nNo. of columns post one hot encoder:', len(var_type_colnames)) else: print('\nNo. of columns in input_df:', len(input_df.columns)) ############################################################################ # Create Pipeline object pipe = Pipeline([ ('pre', col_transform), ('fs', fs), #('clf', LogisticRegression(**rs))]) ('clf', estimator)]) ############################################################################ # Define GridSearchCV gscv_fs = GridSearchCV(pipe , param_gridLd , cv = cv_method , scoring = mcc_score_fn , refit = 'mcc' , verbose = 1 , return_train_score = True , **njobs) gscv_fs.fit(input_df, target) ########################################################################### # Get best param and scores out gscv_fs.best_params_ gscv_fs.best_score_ # Training best score corresponds to the max of the mean_test train_bscore = round(gscv_fs.best_score_, 2); train_bscore print('\nTraining best score (MCC):', train_bscore) gscv_fs.cv_results_['mean_test_mcc'] round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2) check_train_score = [round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) , round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2)] check_train_score = np.nanmax(check_train_score) # Training results gscv_tr_resD = gscv_fs.cv_results_ mod_refit_param = gscv_fs.refit # sanity check if train_bscore == check_train_score: print('\nVerified training score (MCC):', train_bscore ) else: print('\nTraining score could not be internatlly verified. Please check training results dict') #------------------------- # Blind test: REAL check! #------------------------- #tp = gscv_fs.predict(X_bts) tp = gscv_fs.predict(blind_test_df) print('\nMCC on Blind test:' , round(matthews_corrcoef(blind_test_target, tp),2)) print('\nAccuracy on Blind test:', round(accuracy_score(blind_test_target, tp),2)) #================= # info extraction #================= # gives input vals?? gscv_fs._check_n_features # gives gscv params used gscv_fs._get_param_names() # gives ?? gscv_fs.best_estimator_ gscv_fs.best_params_ # gives best estimator params as a dict gscv_fs.best_estimator_._final_estimator # similar to above, doesn't contain max_iter gscv_fs.best_estimator_.named_steps['fs'].get_support() gscv_fs.best_estimator_.named_steps['fs'].ranking_ # array of ranks for the features gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.mean() gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.max() #gscv_fs.best_estimator_.named_steps['fs'].grid_scores_ ############################################################################ #============ # FS results #============ # Now get the features out #-------------- # All features #-------------- all_features = gscv_fs.feature_names_in_ n_all_features = gscv_fs.n_features_in_ #all_features = gsfit.feature_names_in_ #-------------- # Selected features by the classifier # Important to have var_type_colnames here #---------------- #sel_features = X.columns[gscv_fs.best_estimator_.named_steps['fs'].get_support()] 3 only for numerical df sel_features = var_type_colnames[gscv_fs.best_estimator_.named_steps['fs'].get_support()] n_sf = gscv_fs.best_estimator_.named_steps['fs'].n_features_ #-------------- # Get model name #-------------- model_name = gscv_fs.best_estimator_.named_steps['clf'] b_model_params = gscv_fs.best_params_ print('\n========================================' , '\nRunning model:' , '\nModel name:', model_name , '\n===============================================' , '\nRunning feature selection with RFECV for model' , '\nTotal no. of features in model:', len(all_features) , '\nThese are:\n', all_features, '\n\n' , '\nNo of features for best model: ', n_sf , '\nThese are:', sel_features, '\n\n' , '\nBest Model hyperparams:', b_model_params ) ############################################################################### ############################## OUTPUT ######################################### ############################################################################### #========================= # Blind test: BTS results #========================= # Build the final results with all scores for a feature selected model #bts_predict = gscv_fs.predict(X_bts) bts_predict = gscv_fs.predict(blind_test_df) print('\nMCC on Blind test:' , round(matthews_corrcoef(blind_test_target, bts_predict),2)) print('\nAccuracy on Blind test:', round(accuracy_score(blind_test_target, bts_predict),2)) bts_mcc_score = round(matthews_corrcoef(blind_test_target, bts_predict),2) # Diff b/w train and bts test scores train_test_diff = train_bscore - bts_mcc_score print('\nDiff b/w train and blind test score (MCC):', train_test_diff) # create a dict with all scores lr_btsD = {#'best_model': list(gscv_lr_fit_be_mod.items()) #'bts_mcc':None 'bts_fscore':None , 'bts_precision':None , 'bts_recall':None , 'bts_accuracy':None , 'bts_roc_auc':None , 'bts_jaccard':None} lr_btsD #lr_btsD['bts_mcc'] = bts_mcc_score lr_btsD['bts_fscore'] = round(f1_score(blind_test_target, bts_predict),2) lr_btsD['bts_precision'] = round(precision_score(blind_test_target, bts_predict),2) lr_btsD['bts_recall'] = round(recall_score(blind_test_target, bts_predict),2) lr_btsD['bts_accuracy'] = round(accuracy_score(blind_test_target, bts_predict),2) lr_btsD['bts_roc_auc'] = round(roc_auc_score(blind_test_target, bts_predict),2) lr_btsD['bts_jaccard'] = round(jaccard_score(blind_test_target, bts_predict),2) lr_btsD #=========================== # Add FS related model info #=========================== model_namef = str(model_name) # FIXME: doesn't tell you which it has chosen fs_methodf = str(gscv_fs.best_estimator_.named_steps['fs']) all_featuresL = list(all_features) fs_res_arrayf = str(list( gscv_fs.best_estimator_.named_steps['fs'].get_support())) fs_res_array_rankf = str(list( gscv_fs.best_estimator_.named_steps['fs'].ranking_)) sel_featuresf = list(sel_features) n_sf = int(n_sf) output_modelD = {'model_name': model_namef , 'model_refit_param': mod_refit_param , 'Best_model_params': b_model_params , 'n_all_features': n_all_features , 'fs_method': fs_methodf , 'fs_res_array': fs_res_arrayf , 'fs_res_array_rank': fs_res_array_rankf , 'all_feature_names': all_featuresL , 'n_sel_features': n_sf , 'sel_features_names': sel_featuresf} output_modelD #======================================== # Update output_modelD with bts_results #======================================== output_modelD.update(lr_btsD) output_modelD output_modelD['train_score (MCC)'] = train_bscore output_modelD['bts_mcc'] = bts_mcc_score output_modelD['train_bts_diff'] = round(train_test_diff,2) print(output_modelD) return(output_modelD)