import os import math import random import numpy as np import pandas as pd import matplotlib.pyplot as plt import random from scipy import ndarray from sklearn.neighbors import NearestNeighbors from sklearn.decomposition import PCA from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score from sklearn.metrics import cohen_kappa_score from sklearn.metrics import precision_score from sklearn.metrics import recall_score from collections import Counter from imblearn.datasets import fetch_datasets from sklearn.preprocessing import StandardScaler import keras from keras.layers import Dense, Dropout, Input from keras.models import Model,Sequential from tqdm import tqdm from keras.layers.advanced_activations import LeakyReLU from tensorflow.keras.optimizers import Adam from keras import losses from keras import backend as K import tensorflow as tf import warnings warnings.filterwarnings("ignore") from sklearn.neighbors import KNeighborsClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.ensemble import GradientBoostingClassifier from numpy.random import seed seed_num=1 seed(seed_num) tf.random.set_seed(seed_num) from library.interfaces import GanBaseClass from library.dataset import DataSet from sklearn.utils import shuffle ## Import dataset data = fetch_datasets()['yeast_me2'] ## Creating label and feature matrices labels_x=data.target ## labels of the data labels_x.shape features_x=data.data ## features of the data features_x.shape # Until now we have obtained the data. We divided it into training and test sets. we separated obtained seperate variables for the majority and miority classes and their labels for both sets. class ConvGAN(GanBaseClass): """ This is a toy example of a GAN. It repeats the first point of the training-data-set. """ def __init__(self, neb, gen, debug=True): self.isTrained = False self.neb = neb self.gen = gen self.loss_history = None self.debug = debug self.dataSet = None def reset(self): """ Resets the trained GAN to an random state. """ self.isTrained = False ## instanciate generator network and visualize architecture self.conv_sample_generator = conv_sample_gen() ## instanciate discriminator network and visualize architecture self.maj_min_discriminator = maj_min_disc() ## instanciate network and visualize architecture self.cg = convGAN(self.conv_sample_generator, self.maj_min_discriminator) def train(self, dataSet, neb_epochs=5): """ Trains the GAN. It stores the data points in the training data set and mark as trained. *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset. We are only interested in the first *maxListSize* points in class 1. """ if dataSet.data1.shape[0] <= 0: raise AttributeError("Train: Expected data class 1 to contain at least one point.") self.dataSet = dataSet self._rough_learning(neb_epochs, dataSet.data1, dataSet.data0) self.isTrained = True def generateDataPoint(self): """ Returns one synthetic data point by repeating the stored list. """ return (self.generateData(1))[0] def generateData(self, numOfSamples=1): """ Generates a list of synthetic data-points. *numOfSamples* is a integer > 0. It gives the number of new generated samples. """ if not self.isTrained: raise ValueError("Try to generate data with untrained Re.") data_min = self.dataSet.data1 data_maj = self.dataSet.data0 neb = self.neb # --- ## roughly claculate the upper bound of the synthetic samples to be generated from each neighbourhood synth_num = (numOfSamples // len(data_min)) + 1 ## generate synth_num synthetic samples from each minority neighbourhood synth_set=[] for i in range(len(data_min)): synth_set.extend(self.generate_data_for_min_point(data_min, i, synth_num)) synth_set = synth_set[:numOfSamples] ## extract the exact number of synthetic samples needed to exactly balance the two classes return np.array(synth_set) # ############################################################### # Hidden internal functions # ############################################################### def _generate_data_for_min_point(self, data_min, index, synth_num, generator): """ generate synth_num synthetic points for a particular minoity sample synth_num -> required number of data points that can be generated from a neighbourhood data_min -> minority class data neb -> oversampling neighbourhood index -> index of the minority sample in a training data whose neighbourhood we want to obtain """ runs = int(synth_num / self.neb) + 1 synth_set = [] for run in range(runs): batch = self._NMB_guided(data_min, index) synth_batch = self.conv_sample_generator.predict(batch) for x in synth_batch: synth_set.append(x) return synth_set[:synth_num] # Training def _rough_learning(self, neb_epochs, data_min, data_maj): generator = self.conv_sample_generator discriminator = self.maj_min_discriminator GAN = self.cg loss_history=[] ## this is for stroring the loss for every run min_idx = 0 neb_epoch_count = 1 labels = [] for i in range(2 * self.gen): if i < gen: labels.append(np.array([1,0])) else: labels.append(np.array([0,1])) labels = np.array(labels) labels = tf.convert_to_tensor(labels) for step in range(neb_epochs * len(data_min)): min_batch = self._NMB_guided(data_min, min_idx) ## generate minority neighbourhood batch for every minority class sampls by index min_idx = min_idx + 1 maj_batch = self._BMB(data_min, data_maj) ## generate random proximal majority batch conv_samples = generator.predict(min_batch) ## generate synthetic samples from convex space of minority neighbourhood batch using generator concat_sample = tf.concat([conv_samples, maj_batch], axis=0) ## concatenate them with the majority batch discriminator.trainable = True ## switch on discriminator training discriminator.fit(x=concat_sample, y=labels, verbose=0) ## train the discriminator with the concatenated samples and the one-hot encoded labels discriminator.trainable = False ## switch off the discriminator training again gan_loss_history = GAN.fit(concat_sample, y=labels, verbose=0) ## use the GAN to make the generator learn on the decisions made by the previous discriminator training loss_history.append(gan_loss_history.history['loss']) ## store the loss for the step if self.debug and ((step + 1) % 10 == 0): print(f"{step + 1} neighbourhood batches trained; running neighbourhood epoch {neb_epoch_count}") if min_idx == len(data_min) - 1: if self.debug: print(f"Neighbourhood epoch {neb_epoch_count} complete") neb_epoch_count = neb_epoch_count + 1 min_idx = 0 if self.debug: run_range = range(1, len(loss_history) + 1) plt.rcParams["figure.figsize"] = (16,10) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.xlabel('runs', fontsize=25) plt.ylabel('loss', fontsize=25) plt.title('Rough learning loss for discriminator', fontsize=25) plt.plot(run_range, loss_history) plt.show() self.conv_sample_generator = generator self.maj_min_discriminator = discriminator self.cg = GAN self.loss_history = loss_history ## convGAN def _BMB(self, data_min, data_maj): ## Generate a borderline majority batch ## data_min -> minority class data ## data_maj -> majority class data ## neb -> oversampling neighbourhood ## gen -> convex combinations generated from each neighbourhood neigh = NearestNeighbors(self.neb) n_feat = data_min.shape[1] neigh.fit(data_maj) bmbi = [ neigh.kneighbors([data_min[i]], self.neb, return_distance=False) for i in range(len(data_min)) ] bmbi = np.unique(np.array(bmbi).flatten()) bmbi = shuffle(bmbi) return tf.convert_to_tensor( data_maj[np.random.randint(len(data_maj), size=self.gen)] ) def _NMB_guided(self, data_min, index): ## generate a minority neighbourhood batch for a particular minority sample ## we need this for minority data generation ## we will generate synthetic samples for each training data neighbourhood ## index -> index of the minority sample in a training data whose neighbourhood we want to obtain ## data_min -> minority class data ## neb -> oversampling neighbourhood neigh = NearestNeighbors(self.neb) neigh.fit(data_min) nmbi = neigh.kneighbors([data_min[index]], self.neb, return_distance=False) nmbi = shuffle(nmbi) nmb = data_min[nmbi] nmb = tf.convert_to_tensor(nmb[0]) return (nmb) def conv_sample_gen(): ## the generator network to generate synthetic samples from the convex space of arbitrary minority neighbourhoods min_neb_batch = keras.layers.Input(shape=(n_feat,)) ## takes minority batch as input x=tf.reshape(min_neb_batch, (1,neb,n_feat), name=None) ## reshaping the 2D tensor to 3D for using 1-D convolution, otherwise 1-D convolution won't work. x= keras.layers.Conv1D(n_feat, 3, activation='relu')(x) ## using 1-D convolution, feature dimension remains the same x= keras.layers.Flatten()(x) ## flatten after convolution x= keras.layers.Dense(neb*gen, activation='relu')(x) ## add dense layer to transform the vector to a convenient dimension x= keras.layers.Reshape((neb,gen))(x)## again, witching to 2-D tensor once we have the convenient shape s=K.sum(x,axis=1) ## row wise sum s_non_zero=tf.keras.layers.Lambda(lambda x: x+.000001)(s) ## adding a small constant to always ensure the row sums are non zero. if this is not done then during initialization the sum can be zero sinv=tf.math.reciprocal(s_non_zero) ## reprocals of the approximated row sum x=keras.layers.Multiply()([sinv,x]) ## At this step we ensure that row sum is 1 for every row in x. That means, each row is set of convex co-efficient aff=tf.transpose(x[0]) ## Now we transpose the matrix. So each column is now a set of convex coefficients synth=tf.matmul(aff,min_neb_batch) ## We now do matrix multiplication of the affine combinations with the original minority batch taken as input. This generates a convex transformation of the input minority batch model = Model(inputs=min_neb_batch, outputs=synth) ## finally we compile the generator with an arbitrary minortiy neighbourhood batch as input and a covex space transformation of the same number of samples as output opt = Adam(learning_rate=0.001) model.compile(loss='mean_squared_logarithmic_error', optimizer=opt) return model def maj_min_disc(): ## the discriminator is trained intwo phase: ## first phase: while training GAN the discriminator learns to differentiate synthetic minority samples generated from convex minority data space against the borderline majority samples ## second phase: after the GAN generator learns to create synthetic samples, it can be used to generate synthetic samples to balance the dataset ## and then rettrain the discriminator with the balanced dataset samples=keras.layers.Input(shape=(n_feat,)) ## takes as input synthetic sample generated as input stacked upon a batch of borderline majority samples y= keras.layers.Dense(250, activation='relu')(samples) ## passed through two dense layers y= keras.layers.Dense(125, activation='relu')(y) output= keras.layers.Dense(2, activation='sigmoid')(y) ## two output nodes. outputs have to be one-hot coded (see labels variable before) model = Model(inputs=samples, outputs=output) ## compile model opt = Adam(learning_rate=0.0001) model.compile(loss='binary_crossentropy', optimizer=opt) return model def convGAN(generator,discriminator): ## for joining the generator and the discriminator ## conv_coeff_generator-> generator network instance ## maj_min_discriminator -> discriminator network instance maj_min_disc.trainable=False ## by default the discriminator trainability is switched off. ## Thus training the GAN means training the generator network as per previously trained discriminator network. batch_data = keras.layers.Input(shape=(n_feat,)) ## input receives a neighbourhood minority batch and a proximal majority batch concatenated min_batch = tf.keras.layers.Lambda(lambda x: x[:neb])(batch_data) ## extract minority batch maj_batch = tf.keras.layers.Lambda(lambda x: x[neb:])(batch_data) ## extract majority batch conv_samples=generator(min_batch) ## pass minority batch into generator to obtain convex space transformation (synthetic samples) of the minority neighbourhood input batch new_samples=tf.concat([conv_samples,maj_batch],axis=0) ## concatenate the synthetic samples with the majority samples output=discriminator(new_samples) ## pass the concatenated vector into the discriminator to know its decisions ## note that, the discriminator will not be traied but will make decisions based on its previous training while using this function model = Model(inputs=batch_data, outputs=output) opt = Adam(learning_rate=0.0001) model.compile(loss='mse', optimizer=opt) return model ## this is the main training process where the GAn learns to generate appropriate samples from the convex space ## this is the first training phase for the discriminator and the only training phase for the generator. def rough_learning_predictions(discriminator,test_data_numpy,test_labels_numpy): ## after the first phase of training the discriminator can be used for classification ## it already learns to differentiate the convex minority points with majority points during the first training phase y_pred_2d=discriminator.predict(tf.convert_to_tensor(test_data_numpy)) ## discretisation of the labels y_pred=np.digitize(y_pred_2d[:,0], [.5]) ## prediction shows a model with good recall and less precision c=confusion_matrix(test_labels_numpy, y_pred) f=f1_score(test_labels_numpy, y_pred) pr=precision_score(test_labels_numpy, y_pred) rc=recall_score(test_labels_numpy, y_pred) k=cohen_kappa_score(test_labels_numpy, y_pred) print('Rough learning confusion matrix:', c) print('Rough learning f1 score', f) print('Rough learning precision score', pr) print('Rough learning recall score', rc) print('Rough learning kappa score', k) return c,f,pr,rc,k def generate_synthetic_data(gan, data_min, data_maj): ## roughly claculate the upper bound of the synthetic samples to be generated from each neighbourhood synth_num=((len(data_maj)-len(data_min))//len(data_min))+1 ## generate synth_num synthetic samples from each minority neighbourhood synth_set = gan.generateData(synth_num) ovs_min_class=np.concatenate((data_min,synth_set),axis=0) ovs_training_dataset=np.concatenate((ovs_min_class,data_maj),axis=0) ovs_pca_labels=np.concatenate((np.zeros(len(data_min)),np.zeros(len(synth_set))+1,np.zeros(len(data_maj))+2)) ovs_training_labels=np.concatenate((np.zeros(len(ovs_min_class))+1,np.zeros(len(data_maj))+0)) ovs_training_labels_oh=[] for i in range(len(ovs_training_dataset)): if i