import math import numpy as np from tqdm import tqdm from keras.layers import Dense, Dropout, Input from keras.models import Model,Sequential from keras.layers.advanced_activations import LeakyReLU from keras.optimizers import Adam from sklearn.neighbors import NearestNeighbors def adam_optimizer(): return Adam(lr=0.0002, beta_1=0.5) def Neb_grps(data, near_neb): nbrs = NearestNeighbors(n_neighbors=near_neb, algorithm='ball_tree').fit(data) _distances, indices = nbrs.kneighbors(data) neb_class = list(indices) return np.asarray(neb_class) class GanTrainParameters: """ Parameters for Training the GAN Network. """ def __init__(self, n_feat, batch_size, min_t, features_0_trn, features_1_trn): self.batch_size = batch_size self.n_feat = n_feat self.min_t = min_t self.features_0_trn = features_0_trn self.features_1_trn = features_1_trn def im_batch_creator_min(self): nbd = Neb_grps(self.min_t, self.batch_size) rand = np.random.randint(low=0, high=self.features_1_trn.shape[0], size=1) idx = tuple(list(nbd[rand])) image_batch = self.features_1_trn[idx] return image_batch def im_batch_creator_maj(self): rand = np.random.randint(low=0, high=self.features_0_trn.shape[0], size=self.batch_size) image_batch = np.reshape(self.features_0_trn[rand[:,None]], (self.batch_size, self.n_feat)) return image_batch class TLoRasNoise: """ Noise function """ def __init__(self, shadow=50, sigma=.005, num_afcomb=7): self.shadow = shadow self.sigma = sigma self.num_afcomb = num_afcomb def tLoRAS(self, data, num_samples, num_RACOS): np.random.seed(42) data_shadow = np.asarray([ d + np.random.normal(0, self.sigma) for d in data[:num_samples] for _c in range(self.shadow) ]) return np.asarray([ self.shadowLcDataPoint(num_samples, data_shadow) for _i in range(num_RACOS) ]) def shadowLcDataPoint(self, num_samples, data_shadow): idx = np.random.randint(self.shadow * num_samples, size=self.num_afcomb) w = np.random.randint(100, size=len(idx)) aff_w = np.asarray(w/sum(w)) data_tsl = np.array(data_shadow)[idx,:] return np.dot(aff_w, data_tsl) def noise(self, data, batch_size): return self.tLoRAS(data=data, num_samples=batch_size, num_RACOS=batch_size) class GAN: """ Class for GAN. """ def __init__(self, n_feat=1, noise=None, discriminatorMin=None, discriminatorMax=None, generator=None): self.n_feat = n_feat if noise is None: self.noise = TLoRasNoise() else: self.noise = noise self.create_gan( discriminatorMin or self.create_discriminator_min(), discriminatorMax or self.create_discriminator_maj(), generator or self.create_generator()) def create_generator(self): generator=Sequential() generator.add(Dense(units=25, input_dim=self.n_feat)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=256)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=512)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=256)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=25)) generator.add(LeakyReLU(0.2)) generator.add(Dense(units=self.n_feat)) generator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return generator def create_discriminator_min(self): discriminator=Sequential() discriminator.add(Dense(units=1024,input_dim=self.n_feat)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=512)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=256)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dense(units=1, activation='sigmoid')) discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return discriminator def create_discriminator_maj(self): discriminator=Sequential() discriminator.add(Dense(units=1024,input_dim=self.n_feat)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=512)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dropout(0.3)) discriminator.add(Dense(units=256)) discriminator.add(LeakyReLU(0.2)) discriminator.add(Dense(units=1, activation='sigmoid')) discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer()) return discriminator def create_gan(self, discriminator_min, discriminator_maj, generator): discriminator_min.trainable = False discriminator_maj.trainable = False gan_input = Input(shape=(self.n_feat,)) x = generator(gan_input) gan_output_min= discriminator_min(x) gan_output_maj= discriminator_maj(x) gan = Model(inputs=gan_input, outputs=[gan_output_min,gan_output_maj]) gan.compile(loss=['binary_crossentropy','binary_crossentropy'], optimizer='adam') # store the parts for later usage. self.generator = generator self.discriminator_min = discriminator_min self.discriminator_maj = discriminator_maj self.gan = gan return gan def train(self, parameters): for e in range(1,30+1 ): print(e) for _i in tqdm(range(parameters.batch_size)): # Get a random set of real images image_batch = parameters.im_batch_creator_min() #generate random noise as an input to initialize the generator noise_min = self.noise.noise(image_batch, parameters.batch_size) # Generate fake samples from noised input generated_images = self.generator.predict(noise_min) #Construct different batches of real and fake data X = np.concatenate((image_batch, generated_images)) # Labels for generated and real data y_dis = np.zeros(2* parameters.batch_size) y_dis[: parameters.batch_size]=0.9 #Pre train discriminator_min on fake and real data before starting the gan. self.discriminator_min.trainable = True _d_loss_min = self.discriminator_min.train_on_batch(X, y_dis) if e==0 or e>15: image_batch_maj = parameters.im_batch_creator_maj() X_maj = np.concatenate((image_batch_maj, generated_images)) y_dis_maj=np.ones(2* parameters.batch_size)+1 y_dis_maj[: parameters.batch_size]=0 #Pre train discriminator_maj on fake and real data before starting the gan. self.discriminator_maj.trainable = True _d_loss_maj = self.discriminator_maj.train_on_batch(X_maj, y_dis_maj) #Tricking the noised input of the Generator as real data noise = self.noise.noise(image_batch, parameters.batch_size) y_gen_min = np.ones(parameters.batch_size) # During the training of gan, # the weights of discriminator should be fixed. #We can enforce that by setting the trainable flag self.discriminator_min.trainable = False self.discriminator_maj.trainable = False #training the GAN by alternating the training of the Discriminator #and training the chained GAN model with Discriminator’s weights freezed. _g_loss_min = self.gan.train_on_batch(noise, [y_gen_min, y_gen_min]) def genFeat(self, parameters): im_batch = parameters.im_batch_creator_min() noise = self.noise.noise(im_batch, parameters.batch_size) return self.generator.predict(noise) def predict(self, data): y_pred = self.discriminator_maj.predict(data) return np.reshape(y_pred, len(data)) class DataSet: """ Stores data and Labels. """ def __init__(self, data=None, labels=None, data0=None, data1=None): if data is None: self.fromData01(data0, data1) else: if labels is None: raise "expected labels to be a numpy.array" else: self.data = data self.labels = labels def fromData01(self, data0=None, data1=None): if data0 is None and data1 is None: raise "Expected data, data0 or data1 to be a numpy.array" elif data0 is None: self.data = data1 self.labels = np.zeros(len(data1)) + 1 elif data1 is None: self.data = data0 self.labels = np.zeros(len(data0)) else: self.data = np.concatenate((data1, data0)) self.labels = np.concatenate(( np.zeros(len(data1)) + 1, np.zeros(len(data0)) )) class TrainTestData: """ Stores features, data and labels for class 0 and class 1. """ def __init__(self, features0, features1, trainFactor=0.9): self.nFeatures0 = len(features0) self.nFeatures1 = len(features1) self.features_0_trn, self.features_0_tst = self.splitUpData(features0, trainFactor) self.features_1_trn, self.features_1_tst = self.splitUpData(features1, trainFactor) self.test = DataSet(data1=self.features_1_tst, data0=self.features_0_tst) self.train = DataSet(data1=self.features_1_trn, data0=self.features_0_trn) def splitUpData(self, data, trainFactor=0.9): size = len(data) trainSize = math.ceil(size * trainFactor) trn = data[list(range(0, trainSize))] tst = data[list(range(trainSize, size))] return trn, tst