import math import numpy as np import pandas as pd from tqdm import tqdm from keras.layers import Dense, Dropout, Input from keras.models import Model,Sequential from keras.layers.advanced_activations import LeakyReLU from keras.optimizers import Adam from sklearn.neighbors import NearestNeighbors from sklearn.manifold import TSNE from imblearn.datasets import fetch_datasets def adam_optimizer(): return Adam(lr=0.0002, beta_1=0.5) def Neb_grps(data, near_neb): nbrs = NearestNeighbors(n_neighbors=near_neb, algorithm='ball_tree').fit(data) _distances, indices = nbrs.kneighbors(data) neb_class = list(indices) return np.asarray(neb_class) class GanTrainParameters: """ Parameters for Training the GAN Network. """ def __init__(self, n_feat, batch_size, min_t, features_0_trn, features_1_trn): self.batch_size = batch_size self.n_feat = n_feat self.min_t = min_t self.features_0_trn = features_0_trn self.features_1_trn = features_1_trn def im_batch_creator_min(self): nbd = Neb_grps(self.min_t, self.batch_size) rand = np.random.randint(low=0, high=self.features_1_trn.shape[0], size=1) idx = tuple(list(nbd[rand])) image_batch = self.features_1_trn[idx] return image_batch def im_batch_creator_maj(self): rand = np.random.randint(low=0, high=self.features_0_trn.shape[0], size=self.batch_size) image_batch = np.reshape(self.features_0_trn[rand[:,None]], (self.batch_size, self.n_feat)) return image_batch class TLoRasNoise: """ Noise function """ def __init__(self, shadow=50, sigma=.005, num_afcomb=7): self.shadow = shadow self.sigma = sigma self.num_afcomb = num_afcomb def tLoRAS(self, data, num_samples, num_RACOS): np.random.seed(42) data_shadow = np.asarray([ d + np.random.normal(0, self.sigma) for d in data[:num_samples] for _c in range(self.shadow) ]) return np.asarray([ self.shadowLcDataPoint(num_samples, data_shadow) for _i in range(num_RACOS) ]) def shadowLcDataPoint(self, num_samples, data_shadow): idx = np.random.randint(self.shadow * num_samples, size=self.num_afcomb) w = np.random.randint(100, size=len(idx)) aff_w = np.asarray(w/sum(w)) data_tsl = np.array(data_shadow)[idx,:] return np.dot(aff_w, data_tsl) def noise(self, data, batch_size): return self.tLoRAS(data=data, num_samples=batch_size, num_RACOS=batch_size) class GAN: """ Class for GAN. """ def __init__(self, n_feat=1, noise=None): self.n_feat = n_feat if noise is None: self.noise = TLoRasNoise() else: self.noise = noise self.gan = self.create_gan( self.create_discriminator_min(), self.create_discriminator_maj(), self.create_generator()) def create_generator(self): generator=Sequential() generator.add(Dense(units=25, input_dim=self.n_feat)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=256)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=512)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=256)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=25)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=self.n_feat)) generator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return generator def create_discriminator_min(self): discriminator=Sequential() discriminator.add(Dense(units=1024,input_dim=self.n_feat)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=512)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=256)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dense(units=1, activation='sigmoid')) discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return discriminator def create_discriminator_maj(self): discriminator=Sequential() discriminator.add(Dense(units=1024,input_dim=self.n_feat)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=512)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=256)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dense(units=1, activation='sigmoid')) discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return discriminator def create_gan(self, discriminator_min, discriminator_maj, generator): discriminator_min.trainable=False discriminator_maj.trainable=False gan_input = Input(shape=(self.n_feat,)) x = generator(gan_input) gan_output_min= discriminator_min(x) gan_output_maj= discriminator_maj(x) gan = Model(inputs=gan_input, outputs=[gan_output_min,gan_output_maj]) gan.compile(loss=['binary_crossentropy','binary_crossentropy'], optimizer='adam') self.generator = generator self.discriminator_min = discriminator_min self.discriminator_maj = discriminator_maj return gan def train(self, parameters): for e in range(1,30+1 ): print(e) for _i in tqdm(range(parameters.batch_size)): # Get a random set of real images image_batch = parameters.im_batch_creator_min() #generate random noise as an input to initialize the generator noise_min = self.noise.noise(image_batch, parameters.batch_size) # Generate fake samples from noised input generated_images = self.generator.predict(noise_min) #Construct different batches of real and fake data X = np.concatenate((image_batch, generated_images)) # Labels for generated and real data y_dis = np.zeros(2* parameters.batch_size) y_dis[: parameters.batch_size]=0.9 #Pre train discriminator_min on fake and real data before starting the gan. self.discriminator_min.trainable = True _d_loss_min = self.discriminator_min.train_on_batch(X, y_dis) if e==0 or e>15: image_batch_maj = parameters.im_batch_creator_maj() X_maj = np.concatenate((image_batch_maj, generated_images)) y_dis_maj=np.ones(2* parameters.batch_size)+1 y_dis_maj[: parameters.batch_size]=0 #Pre train discriminator_maj on fake and real data before starting the gan. self.discriminator_maj.trainable = True _d_loss_maj = self.discriminator_maj.train_on_batch(X_maj, y_dis_maj) #Tricking the noised input of the Generator as real data noise = self.noise.noise(image_batch, parameters.batch_size) y_gen_min = np.ones(parameters.batch_size) # During the training of gan, # the weights of discriminator should be fixed. #We can enforce that by setting the trainable flag self.discriminator_min.trainable = False self.discriminator_maj.trainable = False #training the GAN by alternating the training of the Discriminator #and training the chained GAN model with Discriminator’s weights freezed. _g_loss_min = self.gan.train_on_batch(noise, [y_gen_min, y_gen_min]) def genFeat(self, parameters): im_batch = parameters.im_batch_creator_min() noise = self.noise.noise(im_batch, parameters.batch_size) return self.generator.predict(noise) def predict(self, data): y_pred = self.discriminator_maj.predict(data) return np.reshape(y_pred, len(data)) class TrainTestData: """ Stores features, data and labels for class 0 and class 1. """ def __init__(self, features0, features1, trainFactor=0.9): self.nFeatures0 = len(features0) self.nFeatures1 = len(features1) self.features_0_trn, self.features_0_tst = self.splitUpData(features0, trainFactor) self.features_1_trn, self.features_1_tst = self.splitUpData(features1, trainFactor) self.testData, self.testLabels = self.joinData(self.features_1_tst, self.features_0_tst) self.trainData, self.trainLabels = self.joinData(self.features_1_trn, self.features_0_trn) def splitUpData(self, data, trainFactor=0.9): size = len(data) trainSize = math.ceil(size * trainFactor) trn = data[list(range(0, trainSize))] tst = data[list(range(trainSize, size))] return trn, tst def joinData(self, data0, data1): data = np.concatenate((data1, data0)) labels = np.concatenate(( np.zeros(len(data1)) + 1, np.zeros(len(data0)) )) return data, labels if __name__ == "__main__": def createTrainParameters(): data = fetch_datasets()['yeast_me2'] labels = data.target features = data.data label_1 = list(np.where(labels == 1)[0]) label_0 = list(np.where(labels == -1)[0]) features_1 = features[label_1] features_0 = features[label_0] features_1_trn = features_1[list(range(0,math.ceil(len(features_1)*2/3)))] data_embedded_min = TSNE(perplexity=.1).fit_transform(features_1_trn) result_min = pd.DataFrame(data=data_embedded_min, columns=['t-SNE0', 't-SNE1']) min_t = np.asmatrix(result_min) min_t = min_t[0:len(features_1_trn)] min_t = min_t[:, [0,1]] return GanTrainParameters( n_feat=len(features[1]), batch_size=30, min_t=min_t, features_0_trn=features_0[list(range(0,math.ceil(len(features_0)*2/3)))], features_1_trn=features_1_trn ) gtp = createTrainParameters() cGan = GAN(n_feat=gtp.n_feat) cGan.train(parameters=gtp)