#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sat May 21 02:52:36 2022 @author: tanu """ ####################################################### # determine categorical and numerical features numerical_ix = X.select_dtypes(include=['int64', 'float64']).columns numerical_ix categorical_ix = X.select_dtypes(include=['object', 'bool']).columns categorical_ix # Determine preprocessing steps ~ var_type var_type = 'mixed' if var_type == 'numerical': t = [('num', MinMaxScaler(), numerical_ix)] if var_type == 'categorical': t = [('cat', OneHotEncoder(), categorical_ix)] if var_type == 'mixed': t = [('cat', OneHotEncoder(), categorical_ix) , ('num', MinMaxScaler(), numerical_ix)] t = [('num', MinMaxScaler(), numerical_ix) , ('cat', OneHotEncoder(), categorical_ix)] col_transform = ColumnTransformer(transformers = t , remainder='passthrough') #--------------ALEX help # col_transform # col_transform.fit(X) # test = col_transform.transform(X) # print(col_transform.get_feature_names_out()) # foo = col_transform.fit_transform(X) Xm = col_transform.fit_transform(X) # (foo == test).all() #----------------------- col_transform.fit(X) col_transform.get_feature_names_out() var_type_colnames = col_transform.get_feature_names_out() var_type_colnames = pd.Index(var_type_colnames) if var_type == 'mixed': print('\nVariable type is:', var_type , '\nNo. of columns in input_df:', len(input_df.columns) , '\nNo. of columns post one hot encoder:', len(var_type_colnames)) else: print('\nNo. of columns in input_df:', len(input_df.columns)) # %% begin stupid # stupid = OneHotEncoder() # stupid.fit(X[categorical_ix]) # stupid_thing = stupid.get_feature_names() # print(len(stupid_thing)) # horrid = (list(stupid_thing) + list(numerical_ix)) # print(horrid) # print(len(horrid)) # asdfasdf = pd.Index(horrid) # asdfasdf[gscv_fs.best_estimator_.named_steps['fs'].get_support()] # col_transform.get_param_names()['transformers'] # len(stupid.get_feature_names()) # len(numerical_ix) # end stupid #%% pipe = Pipeline([ #('pre', MinMaxScaler()) ('pre', col_transform) , ('fs', RFECV(DecisionTreeClassifier(**rs), cv = 10, scoring = 'matthews_corrcoef')) , ('clf', LogisticRegression(**rs))]) ######################################################### #cv = rskf_cv cv = skf_cv # LR: Feature Selelction + GridSearch CV + Pipeline search_space = [ { 'fs__estimator': [LogisticRegression(**rs)] , 'fs__min_features_to_select': [1] ,'fs__cv': [skf_cv] }, { #'clf': [LogisticRegression()], #'clf__C': np.logspace(0, 4, 10), 'clf__C': [1], 'clf__max_iter': [100], 'clf__penalty': ['l1', 'l2'], 'clf__solver': ['saga'] }, { #'clf': [LogisticRegression()], #'clf__C': np.logspace(0, 4, 10), 'clf__C': [2, 2.5], 'clf__max_iter': [100], 'clf__penalty': ['l1', 'l2'], 'clf__solver': ['saga'] }, #{'clf': [RandomForestclf(n_estimators=100)], # 'clf__max_depth': [5, 10, None]}, #{'clf': [KNeighborsclf()], # 'clf__n_neighbors': [3, 7, 11], # 'clf__weights': ['uniform', 'distance'] #} ] gscv_fs = GridSearchCV(pipe , search_space , cv = cv , scoring = mcc_score_fn , refit = 'mcc' , verbose = 1 , return_train_score = True , **njobs) gscv_fs.fit(X, y) #Fitting 10 folds for each of 8 candidates, totalling 80 fits # QUESTION: HOW?? gscv_fs.best_params_ gscv_fs.best_score_ # Training best score corresponds to the max of the mean_test train_bscore = round(gscv_fs.best_score_, 2); train_bscore print('\nTraining best score (MCC):', train_bscore) gscv_fs.cv_results_['mean_test_mcc'] round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2) check_train_score = [round(gscv_fs.cv_results_['mean_test_mcc'].max(),2) , round(np.nanmax(gscv_fs.cv_results_['mean_test_mcc']),2)] check_train_score = np.nanmax(check_train_score) # Training results gscv_tr_resD = gscv_fs.cv_results_ mod_refit_param = gscv_fs.refit # sanity check if train_bscore == check_train_score: print('\nVerified training score (MCC):', train_bscore ) else: print('\nTraining score could not be internatlly verified. Please check training results dict') # Blind test: REAL check! tp = gscv_fs.predict(X_bts) print('\nMCC on Blind test:' , round(matthews_corrcoef(y_bts, tp),2)) print('\nAccuracy on Blind test:', round(accuracy_score(y_bts, tp),2)) ############ # info extraction ############ # gives input vals?? gscv_fs._check_n_features # gives gscv params used gscv_fs._get_param_names() # gives ?? gscv_fs.best_estimator_ gscv_fs.best_params_ # gives best estimator params as a dict gscv_fs.best_estimator_._final_estimator # similar to above, doesn't contain max_iter gscv_fs.best_estimator_.named_steps['fs'].get_support() gscv_fs.best_estimator_.named_steps['fs'].ranking_ # array of ranks for the features gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.mean() gscv_fs.best_estimator_.named_steps['fs'].grid_scores_.max() #gscv_fs.best_estimator_.named_steps['fs'].grid_scores_ ############################################################################### #============ # FS results #============ # Now get the features out all_features = gscv_fs.feature_names_in_ n_all_features = gscv_fs.n_features_in_ #all_features = gsfit.feature_names_in_ #sel_features = X.columns[gscv_fs.best_estimator_.named_steps['fs'].get_support()] #n_sf = gscv_fs.best_estimator_.named_steps['fs'].n_features_ #---------------<<<< HERE #if var_type == 'mixed' sel_features = var_type_colnames[gscv_fs.best_estimator_.named_steps['fs'].get_support()] n_sf = gscv_fs.best_estimator_.named_steps['fs'].n_features_ #---------------<<<< HERE # get model name model_name = gscv_fs.best_estimator_.named_steps['clf'] b_model_params = gscv_fs.best_params_ print('\n========================================' , '\nRunning model:' , '\nModel name:', model_name , '\n===============================================' , '\nRunning feature selection with RFECV for model' , '\nTotal no. of features in model:', len(all_features) , '\nThese are:\n', all_features, '\n\n' , '\nNo of features for best model: ', n_sf , '\nThese are:', sel_features, '\n\n' , '\nBest Model hyperparams:', b_model_params ) ############################################################################### ############################## OUTPUT ######################################### ############################################################################### #========================= # Blind test: BTS results #========================= # Build the final results with all scores for a feature selected model bts_predict = gscv_fs.predict(X_bts) print('\nMCC on Blind test:' , round(matthews_corrcoef(y_bts, bts_predict),2)) print('\nAccuracy on Blind test:', round(accuracy_score(y_bts, bts_predict),2)) bts_mcc_score = round(matthews_corrcoef(y_bts, bts_predict),2) # Diff b/w train and bts test scores train_test_diff = round(train_bscore - bts_mcc_score,2) print('\nDiff b/w train and blind test score (MCC):', train_test_diff) # create a dict with all scores lr_btsD = {#'best_model': list(gscv_lr_fit_be_mod.items()) #'bts_mcc':None 'bts_fscore':None , 'bts_precision':None , 'bts_recall':None , 'bts_accuracy':None , 'bts_roc_auc':None , 'bts_jaccard':None} lr_btsD #lr_btsD['bts_mcc'] = bts_mcc_score lr_btsD['bts_fscore'] = round(f1_score(y_bts, bts_predict),2) lr_btsD['bts_precision'] = round(precision_score(y_bts, bts_predict),2) lr_btsD['bts_recall'] = round(recall_score(y_bts, bts_predict),2) lr_btsD['bts_accuracy'] = round(accuracy_score(y_bts, bts_predict),2) lr_btsD['bts_roc_auc'] = round(roc_auc_score(y_bts, bts_predict),2) lr_btsD['bts_jaccard'] = round(jaccard_score(y_bts, bts_predict),2) lr_btsD #=========================== # Add FS related model info #=========================== model_namef = str(model_name) # FIXME: doesn't tell you which it has chosen fs_methodf = str(gscv_fs.best_estimator_.named_steps['fs']) all_featuresL = list(all_features) fs_res_arrayf = str(list( gscv_fs.best_estimator_.named_steps['fs'].get_support())) fs_res_array_rankf = list( gscv_fs.best_estimator_.named_steps['fs'].ranking_) sel_featuresf = list(sel_features) n_sf = int(n_sf) output_modelD = {'model_name': model_namef , 'model_refit_param': mod_refit_param , 'Best_model_params': b_model_params , 'n_all_features': n_all_features , 'fs_method': fs_methodf , 'fs_res_array': fs_res_arrayf , 'fs_res_array_rank': fs_res_array_rankf , 'all_feature_names': all_featuresL , 'n_sel_features': n_sf , 'sel_features_names': sel_featuresf} output_modelD #======================================== # Update output_modelD with bts_results #======================================== output_modelD.update(lr_btsD) output_modelD output_modelD['train_score (MCC)'] = train_bscore output_modelD['bts_mcc'] = bts_mcc_score output_modelD['train_bts_diff'] = round(train_test_diff,2) output_modelD class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj) json.dumps(output_modelD, cls=NpEncoder) #======================================== # Write final output file # https://stackoverflow.com/questions/19201290/how-to-save-a-dictionary-to-a-file #======================================== #output final dict as a json outFile = 'LR_FS.json' with open(outFile, 'w') as f: f.write(json.dumps(output_modelD,cls=NpEncoder)) # read json file = 'LR_FS.json' with open(file, 'r') as f: data = json.load(f) ##############################################################################