{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Importing libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import math\n", "import random\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import random\n", "from scipy import ndarray\n", "from sklearn.neighbors import NearestNeighbors\n", "from sklearn.decomposition import PCA\n", "from sklearn.metrics import confusion_matrix\n", "from sklearn.metrics import f1_score\n", "from sklearn.metrics import cohen_kappa_score\n", "from sklearn.metrics import precision_score\n", "from sklearn.metrics import recall_score\n", "from collections import Counter\n", "from imblearn.datasets import fetch_datasets\n", "from sklearn.preprocessing import StandardScaler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import keras\n", "from keras.layers import Dense, Dropout, Input\n", "from keras.models import Model,Sequential\n", "from tqdm import tqdm\n", "from keras.layers.advanced_activations import LeakyReLU\n", "from keras.optimizers import Adam\n", "from keras.optimizers import RMSprop\n", "from keras import losses\n", "from keras import backend as K\n", "import tensorflow as tf" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import warnings\n", "warnings.filterwarnings(\"ignore\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.neighbors import KNeighborsClassifier\n", "from sklearn.ensemble import RandomForestClassifier\n", "from sklearn.ensemble import GradientBoostingClassifier" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from numpy.random import seed\n", "seed_num=1\n", "seed(seed_num)\n", "tf.random.set_seed(seed_num) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = fetch_datasets()['yeast_me2']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating label and feature matrices" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labels_x=data.target ## labels of the data\n", "labels_x.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "features_x=data.data ## features of the data\n", "features_x.shape" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Until now we have obtained the data. We divided it into training and test sets. we separated obtained seperate variables for the majority and miority classes and their labels for both sets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# convGAN" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import Image\n", "Image(filename='CoSPOV.jpg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def unison_shuffled_copies(a, b,seed_perm):\n", " 'Shuffling the feature matrix along with the labels with same order'\n", " np.random.seed(seed_perm)##change seed 1,2,3,4,5\n", " assert len(a) == len(b)\n", " p = np.random.permutation(len(a))\n", " return a[p], b[p]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def BMB(data_min,data_maj, neb, gen):\n", " \n", " ## Generate a borderline majority batch\n", " ## data_min -> minority class data\n", " ## data_maj -> majority class data\n", " ## neb -> oversampling neighbourhood\n", " ## gen -> convex combinations generated from each neighbourhood\n", " \n", " from sklearn.neighbors import NearestNeighbors\n", " from sklearn.utils import shuffle\n", " neigh = NearestNeighbors(neb)\n", " n_feat=data_min.shape[1]\n", " neigh.fit(data_maj)\n", " bmbi=[]\n", " for i in range(len(data_min)):\n", " indices=neigh.kneighbors([data_min[i]],neb,return_distance=False)\n", " bmbi.append(indices)\n", " bmbi=np.unique(np.array(bmbi).flatten())\n", " bmbi=shuffle(bmbi)\n", " bmb=data_maj[np.random.randint(len(data_maj),size=gen)]\n", " bmb=tf.convert_to_tensor(bmb)\n", " return bmb" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def NMB_guided(data_min, neb, index):\n", " \n", " ## generate a minority neighbourhood batch for a particular minority sample\n", " ## we need this for minority data generation\n", " ## we will generate synthetic samples for each training data neighbourhood\n", " ## index -> index of the minority sample in a training data whose neighbourhood we want to obtain\n", " ## data_min -> minority class data\n", " ## neb -> oversampling neighbourhood\n", " \n", " from sklearn.neighbors import NearestNeighbors\n", " from sklearn.utils import shuffle\n", " neigh = NearestNeighbors(neb)\n", " neigh.fit(data_min)\n", " ind=index\n", " nmbi=neigh.kneighbors([data_min[ind]],neb,return_distance=False)\n", " nmbi=shuffle(nmbi)\n", " nmb=data_min[nmbi]\n", " nmb=tf.convert_to_tensor(nmb[0])\n", " return (nmb)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def conv_sample_gen():\n", " \n", " ## the generator network to generate synthetic samples from the convex space of arbitrary minority neighbourhoods\n", " \n", " min_neb_batch = keras.layers.Input(shape=(n_feat,)) ## takes minority batch as input\n", " x=tf.reshape(min_neb_batch, (1,neb,n_feat), name=None) ## reshaping the 2D tensor to 3D for using 1-D convolution, otherwise 1-D convolution won't work.\n", " x= keras.layers.Conv1D(n_feat, 3, activation='relu')(x) ## using 1-D convolution, feature dimension remains the same\n", " x= keras.layers.Flatten()(x) ## flatten after convolution\n", " x= keras.layers.Dense(neb*gen, activation='relu')(x) ## add dense layer to transform the vector to a convenient dimension\n", " x= keras.layers.Reshape((neb,gen))(x)## again, witching to 2-D tensor once we have the convenient shape\n", " s=K.sum(x,axis=1) ## row wise sum\n", " s_non_zero=tf.keras.layers.Lambda(lambda x: x+.000001)(s) ## adding a small constant to always ensure the row sums are non zero. if this is not done then during initialization the sum can be zero\n", " sinv=tf.math.reciprocal(s_non_zero) ## reprocals of the approximated row sum\n", " x=keras.layers.Multiply()([sinv,x]) ## At this step we ensure that row sum is 1 for every row in x. That means, each row is set of convex co-efficient\n", " aff=tf.transpose(x[0]) ## Now we transpose the matrix. So each column is now a set of convex coefficients\n", " synth=tf.matmul(aff,min_neb_batch) ## We now do matrix multiplication of the affine combinations with the original minority batch taken as input. This generates a convex transformation of the input minority batch\n", " model = Model(inputs=min_neb_batch, outputs=synth) ## finally we compile the generator with an arbitrary minortiy neighbourhood batch as input and a covex space transformation of the same number of samples as output\n", " opt = keras.optimizers.Adam(learning_rate=0.001)\n", " model.compile(loss='mean_squared_logarithmic_error', optimizer=opt)\n", " return model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def maj_min_disc():\n", " \n", " ## the discriminator is trained intwo phase: \n", " ## first phase: while training GAN the discriminator learns to differentiate synthetic minority samples generated from convex minority data space against the borderline majority samples\n", " ## second phase: after the GAN generator learns to create synthetic samples, it can be used to generate synthetic samples to balance the dataset\n", " ## and then rettrain the discriminator with the balanced dataset\n", " \n", " samples=keras.layers.Input(shape=(n_feat,)) ## takes as input synthetic sample generated as input stacked upon a batch of borderline majority samples \n", " y= keras.layers.Dense(250, activation='relu')(samples) ## passed through two dense layers \n", " y= keras.layers.Dense(125, activation='relu')(y)\n", " output= keras.layers.Dense(2, activation='sigmoid')(y) ## two output nodes. outputs have to be one-hot coded (see labels variable before)\n", " model = Model(inputs=samples, outputs=output) ## compile model\n", " opt = keras.optimizers.Adam(learning_rate=0.0001)\n", " model.compile(loss='binary_crossentropy', optimizer=opt)\n", " return model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def convGAN(generator,discriminator):\n", " \n", " ## for joining the generator and the discriminator\n", " ## conv_coeff_generator-> generator network instance\n", " ## maj_min_discriminator -> discriminator network instance\n", " \n", " maj_min_disc.trainable=False ## by default the discriminator trainability is switched off. \n", " ## Thus training the GAN means training the generator network as per previously trained discriminator network.\n", " batch_data = keras.layers.Input(shape=(n_feat,)) ## input receives a neighbourhood minority batch and a proximal majority batch concatenated\n", " min_batch = tf.keras.layers.Lambda(lambda x: x[:neb])(batch_data) ## extract minority batch\n", " maj_batch = tf.keras.layers.Lambda(lambda x: x[neb:])(batch_data) ## extract majority batch \n", " conv_samples=generator(min_batch) ## pass minority batch into generator to obtain convex space transformation (synthetic samples) of the minority neighbourhood input batch\n", " new_samples=tf.concat([conv_samples,maj_batch],axis=0) ## concatenate the synthetic samples with the majority samples \n", " output=discriminator(new_samples) ## pass the concatenated vector into the discriminator to know its decisions\n", " ## note that, the discriminator will not be traied but will make decisions based on its previous training while using this function\n", " model = Model(inputs=batch_data, outputs=output)\n", " opt = keras.optimizers.Adam(learning_rate=0.0001)\n", " model.compile(loss='mse', optimizer=opt)\n", " return model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "## this is the main training process where the GAn learns to generate appropriate samples from the convex space\n", "## this is the first training phase for the discriminator and the only training phase for the generator.\n", "\n", "\n", "def rough_learning(neb_epochs,data_min,data_maj,neb,gen,generator, discriminator,GAN):\n", "\n", " \n", " step=1\n", " loss_history=[] ## this is for stroring the loss for every run\n", " min_idx=0\n", " neb_epoch_count=1\n", " \n", " labels=[]\n", " for i in range(2*gen):\n", " if i required number of data points that can be generated from a neighbourhood\n", " ## data_min -> minority class data\n", " ## neb -> oversampling neighbourhood\n", " ## index -> index of the minority sample in a training data whose neighbourhood we want to obtain\n", " \n", " runs=int(synth_num/neb)+1\n", " synth_set=[]\n", " for run in range(runs):\n", " batch=NMB_guided(data_min, neb, index)\n", " synth_batch=generator.predict(batch)\n", " for i in range(len(synth_batch)):\n", " synth_set.append(synth_batch[i])\n", " synth_set=synth_set[:synth_num]\n", " synth_set=np.array(synth_set)\n", " return(synth_set)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def generate_synthetic_data(data_min,data_maj,neb,generator):\n", " \n", " ## roughly claculate the upper bound of the synthetic samples to be generated from each neighbourhood\n", " synth_num=((len(data_maj)-len(data_min))//len(data_min))+1\n", "\n", " ## generate synth_num synthetic samples from each minority neighbourhood\n", " synth_set=[]\n", " for i in range(len(data_min)):\n", " synth_i=generate_data_for_min_point(data_min,neb,i,synth_num,generator)\n", " for k in range(len(synth_i)):\n", " synth_set.append(synth_i[k])\n", " synth_set=synth_set[:(len(data_maj)-len(data_min))] ## extract the exact number of synthetic samples needed to exactly balance the two classes\n", " synth_set=np.array(synth_set)\n", " ovs_min_class=np.concatenate((data_min,synth_set),axis=0)\n", " ovs_training_dataset=np.concatenate((ovs_min_class,data_maj),axis=0)\n", " ovs_pca_labels=np.concatenate((np.zeros(len(data_min)),np.zeros(len(synth_set))+1,np.zeros(len(data_maj))+2))\n", " ovs_training_labels=np.concatenate((np.zeros(len(ovs_min_class))+1,np.zeros(len(data_maj))+0))\n", " ovs_training_labels_oh=[]\n", " for i in range(len(ovs_training_dataset)):\n", " if i