import numpy as np from numpy.random import seed import pandas as pd import matplotlib.pyplot as plt from library.interfaces import GanBaseClass from library.dataset import DataSet from sklearn.decomposition import PCA from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score from sklearn.metrics import cohen_kappa_score from sklearn.metrics import precision_score from sklearn.metrics import recall_score from sklearn.neighbors import NearestNeighbors from sklearn.utils import shuffle from imblearn.datasets import fetch_datasets from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape from keras.models import Model from keras import backend as K from tqdm import tqdm import tensorflow as tf from tensorflow.keras.optimizers import Adam from tensorflow.keras.layers import Lambda import time from library.NNSearch_experimental import NNSearch from library.timing import timing import warnings warnings.filterwarnings("ignore") def repeat(x, times): return [x for _i in range(times)] def create01Labels(totalSize, sizeFirstHalf): labels = repeat(np.array([1,0]), sizeFirstHalf) labels.extend(repeat(np.array([0,1]), totalSize - sizeFirstHalf)) return np.array(labels) class ConvGAN_experimental(GanBaseClass): """ This is a toy example of a GAN. It repeats the first point of the training-data-set. """ def __init__(self, n_feat, neb=5, gen=5, neb_epochs=10, debug=True): self.isTrained = False self.n_feat = n_feat self.neb = neb self.gen = gen self.neb_epochs = 10 self.loss_history = None self.debug = debug self.dataSet = None self.conv_sample_generator = None self.maj_min_discriminator = None self.cg = None self.tNbhFit = 0.0 self.tNbhSearch = 0.0 self.nNbhFit = 0 self.nNbhSearch = 0 self.timing = { name: timing(name) for name in ["reset", "train", "create points", "NMB", "BMB", "_generate_data_for_min_point","predict"]} if neb > gen: raise ValueError(f"Expected neb <= gen but got neb={neb} and gen={gen}.") def reset(self, _dataSet): """ Resets the trained GAN to an random state. """ self.timing["reset"].start() self.isTrained = False ## instanciate generator network and visualize architecture self.conv_sample_generator = self._conv_sample_gen() ## instanciate discriminator network and visualize architecture self.maj_min_discriminator = self._maj_min_disc() ## instanciate network and visualize architecture self.cg = self._convGAN(self.conv_sample_generator, self.maj_min_discriminator) self.timing["reset"].stop() if self.debug: print(self.conv_sample_generator.summary()) print('\n') print(self.maj_min_discriminator.summary()) print('\n') print(self.cg.summary()) print('\n') def train(self, dataSet): """ Trains the GAN. It stores the data points in the training data set and mark as trained. *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset. We are only interested in the first *maxListSize* points in class 1. """ self.timing["train"].start() if dataSet.data1.shape[0] <= 0: raise AttributeError("Train: Expected data class 1 to contain at least one point.") self.dataSet = dataSet self.nmb = self._NMB_prepare(dataSet.data1) self._rough_learning(dataSet.data1, dataSet.data0) self.isTrained = True self.timing["train"].stop() def generateDataPoint(self): """ Returns one synthetic data point by repeating the stored list. """ return (self.generateData(1))[0] def generateData(self, numOfSamples=1): """ Generates a list of synthetic data-points. *numOfSamples* is a integer > 0. It gives the number of new generated samples. """ self.timing["create points"].start() if not self.isTrained: raise ValueError("Try to generate data with untrained Re.") data_min = self.dataSet.data1 ## roughly claculate the upper bound of the synthetic samples to be generated from each neighbourhood synth_num = (numOfSamples // len(data_min)) + 1 ## generate synth_num synthetic samples from each minority neighbourhood synth_set=[] for i in range(len(data_min)): synth_set.extend(self._generate_data_for_min_point(i, synth_num)) ## extract the exact number of synthetic samples needed to exactly balance the two classes synth_set = np.array(synth_set[:numOfSamples]) self.timing["create points"].stop() return synth_set # ############################################################### # Hidden internal functions # ############################################################### # Creating the GAN def _conv_sample_gen(self): """ the generator network to generate synthetic samples from the convex space of arbitrary minority neighbourhoods """ ## takes minority batch as input min_neb_batch = Input(shape=(self.n_feat,)) ## reshaping the 2D tensor to 3D for using 1-D convolution, ## otherwise 1-D convolution won't work. x = tf.reshape(min_neb_batch, (1, self.neb, self.n_feat), name=None) ## using 1-D convolution, feature dimension remains the same x = Conv1D(self.n_feat, 3, activation='relu')(x) ## flatten after convolution x = Flatten()(x) ## add dense layer to transform the vector to a convenient dimension x = Dense(self.neb * self.gen, activation='relu')(x) ## again, witching to 2-D tensor once we have the convenient shape x = Reshape((self.neb, self.gen))(x) ## row wise sum s = K.sum(x, axis=1) ## adding a small constant to always ensure the row sums are non zero. ## if this is not done then during initialization the sum can be zero. s_non_zero = Lambda(lambda x: x + .000001)(s) ## reprocals of the approximated row sum sinv = tf.math.reciprocal(s_non_zero) ## At this step we ensure that row sum is 1 for every row in x. ## That means, each row is set of convex co-efficient x = Multiply()([sinv, x]) ## Now we transpose the matrix. So each column is now a set of convex coefficients aff=tf.transpose(x[0]) ## We now do matrix multiplication of the affine combinations with the original ## minority batch taken as input. This generates a convex transformation ## of the input minority batch synth=tf.matmul(aff, min_neb_batch) ## finally we compile the generator with an arbitrary minortiy neighbourhood batch ## as input and a covex space transformation of the same number of samples as output model = Model(inputs=min_neb_batch, outputs=synth) opt = Adam(learning_rate=0.001) model.compile(loss='mean_squared_logarithmic_error', optimizer=opt) return model def _maj_min_disc(self): """ the discriminator is trained intwo phase: first phase: while training GAN the discriminator learns to differentiate synthetic minority samples generated from convex minority data space against the borderline majority samples second phase: after the GAN generator learns to create synthetic samples, it can be used to generate synthetic samples to balance the dataset and then rettrain the discriminator with the balanced dataset """ ## takes as input synthetic sample generated as input stacked upon a batch of ## borderline majority samples samples = Input(shape=(self.n_feat,)) ## passed through two dense layers y = Dense(250, activation='relu')(samples) y = Dense(125, activation='relu')(y) ## two output nodes. outputs have to be one-hot coded (see labels variable before) output = Dense(2, activation='sigmoid')(y) ## compile model model = Model(inputs=samples, outputs=output) opt = Adam(learning_rate=0.0001) model.compile(loss='binary_crossentropy', optimizer=opt) return model def _convGAN(self, generator, discriminator): """ for joining the generator and the discriminator conv_coeff_generator-> generator network instance maj_min_discriminator -> discriminator network instance """ ## by default the discriminator trainability is switched off. ## Thus training the GAN means training the generator network as per previously ## trained discriminator network. discriminator.trainable = False ## input receives a neighbourhood minority batch ## and a proximal majority batch concatenated batch_data = Input(shape=(self.n_feat,)) ##- print(f"GAN: 0..{self.neb}/{self.gen}..") ## extract minority batch min_batch = Lambda(lambda x: x[:self.neb])(batch_data) ## extract majority batch maj_batch = Lambda(lambda x: x[self.gen:])(batch_data) ## pass minority batch into generator to obtain convex space transformation ## (synthetic samples) of the minority neighbourhood input batch conv_samples = generator(min_batch) ## concatenate the synthetic samples with the majority samples new_samples = tf.concat([conv_samples, maj_batch],axis=0) ##- new_samples = tf.concat([conv_samples, conv_samples, conv_samples, conv_samples],axis=0) ## pass the concatenated vector into the discriminator to know its decisions output = discriminator(new_samples) ##- output = Lambda(lambda x: x[:2 * self.gen])(output) ## note that, the discriminator will not be traied but will make decisions based ## on its previous training while using this function model = Model(inputs=batch_data, outputs=output) opt = Adam(learning_rate=0.0001) model.compile(loss='mse', optimizer=opt) return model # Create synthetic points def _generate_data_for_min_point(self, index, synth_num): """ generate synth_num synthetic points for a particular minoity sample synth_num -> required number of data points that can be generated from a neighbourhood data_min -> minority class data neb -> oversampling neighbourhood index -> index of the minority sample in a training data whose neighbourhood we want to obtain """ self.timing["_generate_data_for_min_point"].start() runs = int(synth_num / self.neb) + 1 synth_set = [] for _run in range(runs): batch = self._NMB_guided(index) self.timing["predict"].start() synth_batch = self.conv_sample_generator.predict(batch) self.timing["predict"].stop() synth_set.extend(synth_batch) self.timing["_generate_data_for_min_point"].stop() return synth_set[:synth_num] # Training def _rough_learning(self, data_min, data_maj): generator = self.conv_sample_generator discriminator = self.maj_min_discriminator GAN = self.cg loss_history = [] ## this is for stroring the loss for every run min_idx = 0 neb_epoch_count = 1 labels = tf.convert_to_tensor(create01Labels(2 * self.gen, self.gen)) for step in range(self.neb_epochs * len(data_min)): ## generate minority neighbourhood batch for every minority class sampls by index min_batch = self._NMB_guided(min_idx) min_idx = min_idx + 1 ## generate random proximal majority batch maj_batch = self._BMB(data_min, data_maj) ## generate synthetic samples from convex space ## of minority neighbourhood batch using generator conv_samples = generator.predict(min_batch) ## concatenate them with the majority batch concat_sample = tf.concat([conv_samples, maj_batch], axis=0) ## switch on discriminator training discriminator.trainable = True ## train the discriminator with the concatenated samples and the one-hot encoded labels discriminator.fit(x=concat_sample, y=labels, verbose=0) ## switch off the discriminator training again discriminator.trainable = False ## use the GAN to make the generator learn on the decisions ## made by the previous discriminator training ##- print(f"concat sample shape: {concat_sample.shape}/{labels.shape}") gan_loss_history = GAN.fit(concat_sample, y=labels, verbose=0) ## store the loss for the step loss_history.append(gan_loss_history.history['loss']) if self.debug and ((step + 1) % 10 == 0): print(f"{step + 1} neighbourhood batches trained; running neighbourhood epoch {neb_epoch_count}") if min_idx == len(data_min) - 1: if self.debug: print(f"Neighbourhood epoch {neb_epoch_count} complete") neb_epoch_count = neb_epoch_count + 1 min_idx = 0 if self.debug: run_range = range(1, len(loss_history) + 1) plt.rcParams["figure.figsize"] = (16,10) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.xlabel('runs', fontsize=25) plt.ylabel('loss', fontsize=25) plt.title('Rough learning loss for discriminator', fontsize=25) plt.plot(run_range, loss_history) plt.show() self.conv_sample_generator = generator self.maj_min_discriminator = discriminator self.cg = GAN self.loss_history = loss_history ## convGAN def _BMB(self, data_min, data_maj): ## Generate a borderline majority batch ## data_min -> minority class data ## data_maj -> majority class data ## neb -> oversampling neighbourhood ## gen -> convex combinations generated from each neighbourhood self.timing["BMB"].start() result = tf.convert_to_tensor( data_maj[np.random.randint(len(data_maj), size=self.gen)] ) self.timing["BMB"].stop() return result def _NMB_prepare(self, data_min): self.timing["NMB"].start() t = time.time() neigh = NNSearch(self.neb, timingDict=self.timing) neigh.fit_cLib(data_min) self.tNbhFit += (time.time() - t) self.nNbhFit += 1 self.timing["NMB"].stop() return (data_min, neigh) def _NMB_guided(self, index): ## generate a minority neighbourhood batch for a particular minority sample ## we need this for minority data generation ## we will generate synthetic samples for each training data neighbourhood ## index -> index of the minority sample in a training data whose neighbourhood we want to obtain ## data_min -> minority class data ## neb -> oversampling neighbourhood self.timing["NMB"].start() (data_min, neigh) = self.nmb t = time.time() nmbi = np.array([neigh.neighbourhoodOfItem(index)]) self.tNbhSearch += (time.time() - t) self.nNbhSearch += 1 nmbi = shuffle(nmbi) nmb = data_min[nmbi] nmb = tf.convert_to_tensor(nmb[0]) self.timing["NMB"].stop() return nmb