import numpy as np from numpy.random import seed import pandas as pd import matplotlib.pyplot as plt from library.interfaces import GanBaseClass from library.dataset import DataSet from library.convGAN import ConvGAN, create01Labels from sklearn.decomposition import PCA from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score from sklearn.metrics import cohen_kappa_score from sklearn.metrics import precision_score from sklearn.metrics import recall_score from sklearn.neighbors import NearestNeighbors from sklearn.utils import shuffle from imblearn.datasets import fetch_datasets from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape from keras.models import Model from keras import backend as K from tqdm import tqdm import tensorflow as tf from tensorflow.keras.optimizers import Adam from tensorflow.keras.layers import Lambda import warnings warnings.filterwarnings("ignore") ## this is the main training process where the GAn learns to generate appropriate samples from the convex space ## this is the first training phase for the discriminator and the only training phase for the generator. def rough_learning_predictions(discriminator,test_data_numpy,test_labels_numpy): """ after the first phase of training the discriminator can be used for classification it already learns to differentiate the convex minority points with majority points during the first training phase """ y_pred_2d = discriminator.predict(tf.convert_to_tensor(test_data_numpy)) ## discretisation of the labels y_pred = np.digitize(y_pred_2d[:,0], [.5]) ## prediction shows a model with good recall and less precision c = confusion_matrix(test_labels_numpy, y_pred) f = f1_score(test_labels_numpy, y_pred) pr = precision_score(test_labels_numpy, y_pred) rc = recall_score(test_labels_numpy, y_pred) k = cohen_kappa_score(test_labels_numpy, y_pred) print('Rough learning confusion matrix:', c) print('Rough learning f1 score', f) print('Rough learning precision score', pr) print('Rough learning recall score', rc) print('Rough learning kappa score', k) return c,f,pr,rc,k def generate_synthetic_data(gan, data_min, data_maj): ## roughly claculate the upper bound of the synthetic samples ## to be generated from each neighbourhood synth_num = ((len(data_maj) - len(data_min)) // len(data_min)) + 1 ## generate synth_num synthetic samples from each minority neighbourhood synth_set = gan.generateData(synth_num) ovs_min_class = np.concatenate((data_min,synth_set), axis=0) ovs_training_dataset = np.concatenate((ovs_min_class,data_maj), axis=0) ovs_pca_labels = np.concatenate(( np.zeros(len(data_min)), np.zeros(len(synth_set)) + 1, np.zeros(len(data_maj)) + 2 )) ovs_training_labels_oh = create01Labels(len(ovs_training_dataset), len(ovs_min_class)) ovs_training_labels_oh = tf.convert_to_tensor(ovs_training_labels_oh) ## PCA visualization of the synthetic sata ## observe how the minority samples from convex space have optimal variance ## and avoids overlap with the majority pca = PCA(n_components=2) pca.fit(ovs_training_dataset) data_pca = pca.transform(ovs_training_dataset) ## plot PCA plt.rcParams["figure.figsize"] = (12,12) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.xlabel('PCA1',fontsize=25) plt.ylabel('PCA2', fontsize=25) plt.title('PCA plot of oversampled data',fontsize=25) classes = ['minority', 'synthetic minority', 'majority'] scatter=plt.scatter(data_pca[:,0], data_pca[:,1], c=ovs_pca_labels, cmap='Set1') plt.legend(handles=scatter.legend_elements()[0], labels=classes, fontsize=20) plt.show() return ovs_training_dataset, ovs_pca_labels, ovs_training_labels_oh def final_learning(discriminator, ovs_training_dataset, ovs_training_labels_oh, test_data_numpy, test_labels_numpy, num_epochs): print('\n') print('Final round training of the discrminator as a majority-minority classifier') print('\n') ## second phase training of the discriminator with balanced data history_second_learning = discriminator.fit(x=ovs_training_dataset, y=ovs_training_labels_oh, batch_size=20, epochs=num_epochs) ## loss of the second phase learning smoothly decreses ## this is because now the data is fixed and diverse convex combinations are no longer fed into the discriminator at every training step run_range = range(1, num_epochs + 1) plt.rcParams["figure.figsize"] = (16,10) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.xlabel('runs',fontsize=25) plt.ylabel('loss', fontsize=25) plt.title('Final learning loss for discriminator', fontsize=25) plt.plot(run_range, history_second_learning.history['loss']) plt.show() ## finally after second phase training the discriminator classifier has a more balanced performance ## meaning better F1-Score ## the recall decreases but the precision improves print('\n') y_pred_2d = discriminator.predict(tf.convert_to_tensor(test_data_numpy)) y_pred = np.digitize(y_pred_2d[:,0], [.5]) c = confusion_matrix(test_labels_numpy, y_pred) f = f1_score(test_labels_numpy, y_pred) pr = precision_score(test_labels_numpy, y_pred) rc = recall_score(test_labels_numpy, y_pred) k = cohen_kappa_score(test_labels_numpy, y_pred) print('Final learning confusion matrix:', c) print('Final learning f1 score', f) print('Final learning precision score', pr) print('Final learning recall score', rc) print('Final learning kappa score', k) return c, f, pr, rc, k def convGAN_train_end_to_end(training_data, training_labels, test_data, test_labels, neb, gen, neb_epochs, epochs_retrain_disc): ##minority class data_min=training_data[np.where(training_labels == 1)[0]] ##majority class data_maj=training_data[np.where(training_labels == 0)[0]] dataSet = DataSet(data0=data_maj, data1=data_min) gan = ConvGAN(data_min.shape[1], neb, gen) gan.reset() ## instanciate generator network and visualize architecture conv_sample_generator = gan.conv_sample_generator print(conv_sample_generator.summary()) print('\n') ## instanciate discriminator network and visualize architecture maj_min_discriminator = gan.maj_min_discriminator print(maj_min_discriminator.summary()) print('\n') ## instanciate network and visualize architecture cg = gan.cg print(cg.summary()) print('\n') print('Training the GAN, first round training of the discrminator as a majority-minority classifier') print('\n') ## train gan generator ## rough_train_discriminator gan.train(dataSet, neb_epochs) print('\n') ## rough learning results c_r,f_r,pr_r,rc_r,k_r = rough_learning_predictions(gan.maj_min_discriminator_r, test_data, test_labels) print('\n') ## generate synthetic data ovs_training_dataset, ovs_pca_labels, ovs_training_labels_oh = generate_synthetic_data(gan, data_min, data_maj) print('\n') ## final training results c,f,pr,rc,k = final_learning(gan.maj_min_discriminator, ovs_training_dataset, ovs_training_labels_oh, test_data, test_labels, epochs_retrain_disc) return ((c_r,f_r,pr_r,rc_r,k_r),(c,f,pr,rc,k)) def unison_shuffled_copies(a, b,seed_perm): 'Shuffling the feature matrix along with the labels with same order' np.random.seed(seed_perm)##change seed 1,2,3,4,5 assert len(a) == len(b) p = np.random.permutation(len(a)) return a[p], b[p] def runTest(): seed_num=1 seed(seed_num) tf.random.set_seed(seed_num) ## Import dataset data = fetch_datasets()['yeast_me2'] ## Creating label and feature matrices labels_x = data.target ## labels of the data features_x = data.data ## features of the data # Until now we have obtained the data. We divided it into training and test sets. we separated obtained seperate variables for the majority and miority classes and their labels for both sets. ## specify parameters neb=gen=5 ##neb=gen required neb_epochs=10 epochs_retrain_disc=50 ## Training np.random.seed(42) strata=5 results=[] for seed_perm in range(strata): features_x,labels_x=unison_shuffled_copies(features_x,labels_x,seed_perm) ### Extracting all features and labels print('Extracting all features and labels for seed:'+ str(seed_perm)+'\n') ## Dividing data into training and testing datasets for 10-fold CV print('Dividing data into training and testing datasets for 10-fold CV for seed:'+ str(seed_perm)+'\n') label_1=list(np.where(labels_x == 1)[0]) features_1=features_x[label_1] label_0=list(np.where(labels_x != 1)[0]) features_0=features_x[label_0] a=len(features_1)//5 b=len(features_0)//5 fold_1_min=features_1[0:a] fold_1_maj=features_0[0:b] fold_1_tst=np.concatenate((fold_1_min,fold_1_maj)) lab_1_tst=np.concatenate((np.zeros(len(fold_1_min))+1, np.zeros(len(fold_1_maj)))) fold_2_min=features_1[a:2*a] fold_2_maj=features_0[b:2*b] fold_2_tst=np.concatenate((fold_2_min,fold_2_maj)) lab_2_tst=np.concatenate((np.zeros(len(fold_1_min))+1, np.zeros(len(fold_1_maj)))) fold_3_min=features_1[2*a:3*a] fold_3_maj=features_0[2*b:3*b] fold_3_tst=np.concatenate((fold_3_min,fold_3_maj)) lab_3_tst=np.concatenate((np.zeros(len(fold_1_min))+1, np.zeros(len(fold_1_maj)))) fold_4_min=features_1[3*a:4*a] fold_4_maj=features_0[3*b:4*b] fold_4_tst=np.concatenate((fold_4_min,fold_4_maj)) lab_4_tst=np.concatenate((np.zeros(len(fold_1_min))+1, np.zeros(len(fold_1_maj)))) fold_5_min=features_1[4*a:] fold_5_maj=features_0[4*b:] fold_5_tst=np.concatenate((fold_5_min,fold_5_maj)) lab_5_tst=np.concatenate((np.zeros(len(fold_5_min))+1, np.zeros(len(fold_5_maj)))) fold_1_trn=np.concatenate((fold_2_min,fold_3_min,fold_4_min,fold_5_min, fold_2_maj,fold_3_maj,fold_4_maj,fold_5_maj)) lab_1_trn=np.concatenate((np.zeros(3*a+len(fold_5_min))+1,np.zeros(3*b+len(fold_5_maj)))) fold_2_trn=np.concatenate((fold_1_min,fold_3_min,fold_4_min,fold_5_min,fold_1_maj,fold_3_maj,fold_4_maj,fold_5_maj)) lab_2_trn=np.concatenate((np.zeros(3*a+len(fold_5_min))+1,np.zeros(3*b+len(fold_5_maj)))) fold_3_trn=np.concatenate((fold_2_min,fold_1_min,fold_4_min,fold_5_min,fold_2_maj,fold_1_maj,fold_4_maj,fold_5_maj)) lab_3_trn=np.concatenate((np.zeros(3*a+len(fold_5_min))+1,np.zeros(3*b+len(fold_5_maj)))) fold_4_trn=np.concatenate((fold_2_min,fold_3_min,fold_1_min,fold_5_min,fold_2_maj,fold_3_maj,fold_1_maj,fold_5_maj)) lab_4_trn=np.concatenate((np.zeros(3*a+len(fold_5_min))+1,np.zeros(3*b+len(fold_5_maj)))) fold_5_trn=np.concatenate((fold_2_min,fold_3_min,fold_4_min,fold_1_min,fold_2_maj,fold_3_maj,fold_4_maj,fold_1_maj)) lab_5_trn=np.concatenate((np.zeros(4*a)+1,np.zeros(4*b))) training_folds_feats=[fold_1_trn,fold_2_trn,fold_3_trn,fold_4_trn,fold_5_trn] testing_folds_feats=[fold_1_tst,fold_2_tst,fold_3_tst,fold_4_tst,fold_5_tst] training_folds_labels=[lab_1_trn,lab_2_trn,lab_3_trn,lab_4_trn,lab_5_trn] testing_folds_labels=[lab_1_tst,lab_2_tst,lab_3_tst,lab_4_tst,lab_5_tst] for i in range(5): print('\n') print('Executing fold: '+str(i+1)) print('\n') r1,r2=convGAN_train_end_to_end(training_folds_feats[i],training_folds_labels[i],testing_folds_feats[i],testing_folds_labels[i], neb, gen, neb_epochs, epochs_retrain_disc) results.append(np.array([list(r1[1:]),list(r2[1:])])) results=np.array(results) ## Benchmark mean_rough=np.mean(results[:,0], axis=0) data_r={'F1-Score_r':[mean_rough[0]], 'Precision_r' : [mean_rough[1]], 'Recall_r' : [mean_rough[2]], 'Kappa_r': [mean_rough[3]]} df_r=pd.DataFrame(data=data_r) print('Rough training results:') print('\n') print(df_r) mean_final=np.mean(results[:,1], axis=0) data_f={'F1-Score_f':[mean_final[0]], 'Precision_f' : [mean_final[1]], 'Recall_f' : [mean_final[2]], 'Kappa_f': [mean_final[3]]} df_f=pd.DataFrame(data=data_f) print('Final training results:') print('\n') print(df_f) if __name__ == "__main__": runTest()