Parcourir la source

first version of LoGAN

Kristian Schultz il y a 4 ans
commit
fd2df9d33d
1 fichiers modifiés avec 293 ajouts et 0 suppressions
  1. 293 0
      LoGAN.py

+ 293 - 0
LoGAN.py

@@ -0,0 +1,293 @@
+import math
+import numpy as np
+import pandas as pd
+from tqdm import tqdm
+from keras.layers import Dense, Dropout, Input
+from keras.models import Model,Sequential
+from keras.layers.advanced_activations import LeakyReLU
+from keras.optimizers import Adam
+from sklearn.neighbors import NearestNeighbors
+from sklearn.manifold import TSNE
+from imblearn.datasets import fetch_datasets
+
+def adam_optimizer():
+    return Adam(lr=0.0002, beta_1=0.5)
+
+def Neb_grps(data, near_neb):
+    nbrs = NearestNeighbors(n_neighbors=near_neb, algorithm='ball_tree').fit(data)
+    _distances, indices = nbrs.kneighbors(data)
+    neb_class = list(indices)
+    return np.asarray(neb_class)
+
+
+class GanTrainParameters:
+    """
+    Parameters for Training the GAN Network.
+    """
+
+    def __init__(self, n_feat, batch_size, min_t, features_0_trn, features_1_trn):
+        self.batch_size = batch_size
+        self.n_feat = n_feat
+        self.min_t = min_t
+        self.features_0_trn = features_0_trn
+        self.features_1_trn = features_1_trn
+
+    def im_batch_creator_min(self):
+        nbd = Neb_grps(self.min_t, self.batch_size)
+        rand = np.random.randint(low=0, high=self.features_1_trn.shape[0], size=1)
+        idx = tuple(list(nbd[rand]))
+        image_batch = self.features_1_trn[idx]
+        return image_batch
+
+    def im_batch_creator_maj(self):
+        rand = np.random.randint(low=0, high=self.features_0_trn.shape[0], size=self.batch_size)
+        image_batch = np.reshape(self.features_0_trn[rand[:,None]], (self.batch_size, self.n_feat))
+        return image_batch
+
+
+class TLoRasNoise:
+    """
+    Noise function
+    """
+
+    def __init__(self, shadow=50, sigma=.005, num_afcomb=7):
+        self.shadow = shadow
+        self.sigma = sigma
+        self.num_afcomb = num_afcomb
+
+    def tLoRAS(self, data, num_samples, num_RACOS):
+        np.random.seed(42)
+        data_shadow = np.asarray([
+            d + np.random.normal(0, self.sigma)
+            for d in data[:num_samples]
+            for _c in range(self.shadow)
+            ])
+
+        return np.asarray([
+            self.shadowLcDataPoint(num_samples, data_shadow)
+            for _i in range(num_RACOS)
+            ])
+
+    def shadowLcDataPoint(self, num_samples, data_shadow):
+        idx = np.random.randint(self.shadow * num_samples, size=self.num_afcomb)
+        w = np.random.randint(100, size=len(idx))
+        aff_w = np.asarray(w/sum(w))
+        data_tsl = np.array(data_shadow)[idx,:]
+        return np.dot(aff_w, data_tsl)
+
+    def noise(self, data, batch_size):
+        return self.tLoRAS(data=data, num_samples=batch_size, num_RACOS=batch_size)
+
+
+
+class GAN:
+    """
+    Class for GAN.
+    """
+
+    def __init__(self, n_feat=1, noise=None):
+        self.n_feat = n_feat
+
+        if noise is None:
+            self.noise = TLoRasNoise()
+        else:
+            self.noise = noise
+
+        self.gan = self.create_gan(
+            self.create_discriminator_min(),
+            self.create_discriminator_maj(),
+            self.create_generator())
+
+
+    def create_generator(self):
+        generator=Sequential()
+        generator.add(Dense(units=25, input_dim=self.n_feat))
+        generator.add(LeakyReLU(0.2))
+
+        generator.add(Dense(units=256))
+        generator.add(LeakyReLU(0.2))
+
+        generator.add(Dense(units=512))
+        generator.add(LeakyReLU(0.2))
+
+        generator.add(Dense(units=256))
+        generator.add(LeakyReLU(0.2))
+
+        generator.add(Dense(units=25))
+        generator.add(LeakyReLU(0.2))
+
+        generator.add(Dense(units=self.n_feat))
+
+        generator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
+        return generator
+
+    def create_discriminator_min(self):
+        discriminator=Sequential()
+        discriminator.add(Dense(units=1024,input_dim=self.n_feat))
+        discriminator.add(LeakyReLU(0.2))
+        discriminator.add(Dropout(0.3))
+
+        discriminator.add(Dense(units=512))
+        discriminator.add(LeakyReLU(0.2))
+        discriminator.add(Dropout(0.3))
+
+        discriminator.add(Dense(units=256))
+        discriminator.add(LeakyReLU(0.2))
+
+        discriminator.add(Dense(units=1, activation='sigmoid'))
+
+        discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
+        return discriminator
+
+    def create_discriminator_maj(self):
+        discriminator=Sequential()
+        discriminator.add(Dense(units=1024,input_dim=self.n_feat))
+        discriminator.add(LeakyReLU(0.2))
+        discriminator.add(Dropout(0.3))
+
+        discriminator.add(Dense(units=512))
+        discriminator.add(LeakyReLU(0.2))
+        discriminator.add(Dropout(0.3))
+
+        discriminator.add(Dense(units=256))
+        discriminator.add(LeakyReLU(0.2))
+
+        discriminator.add(Dense(units=1, activation='sigmoid'))
+
+        discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
+        return discriminator
+
+    def create_gan(self, discriminator_min, discriminator_maj, generator):
+        discriminator_min.trainable=False
+        discriminator_maj.trainable=False
+        gan_input = Input(shape=(self.n_feat,))
+        x = generator(gan_input)
+        gan_output_min= discriminator_min(x)
+        gan_output_maj= discriminator_maj(x)
+        gan = Model(inputs=gan_input, outputs=[gan_output_min,gan_output_maj])
+        gan.compile(loss=['binary_crossentropy','binary_crossentropy'], optimizer='adam')
+        self.generator = generator
+        self.discriminator_min = discriminator_min
+        self.discriminator_maj = discriminator_maj
+        return gan
+
+    def train(self, parameters):
+        for e in range(1,30+1 ):
+            print(e)
+            for _i in tqdm(range(parameters.batch_size)):
+                # Get a random set of  real images
+                image_batch = parameters.im_batch_creator_min()
+
+                #generate  random noise as an input  to  initialize the  generator
+                noise_min = self.noise.noise(image_batch, parameters.batch_size)
+
+                # Generate fake samples from noised input
+                generated_images = self.generator.predict(noise_min)
+
+                #Construct different batches of  real and fake data
+                X = np.concatenate((image_batch, generated_images))
+
+                # Labels for generated and real data
+                y_dis = np.zeros(2* parameters.batch_size)
+                y_dis[: parameters.batch_size]=0.9
+
+                #Pre train discriminator_min on  fake and real data  before starting the gan.
+                self.discriminator_min.trainable = True
+                _d_loss_min = self.discriminator_min.train_on_batch(X, y_dis)
+
+                if e==0 or e>15:
+                    image_batch_maj = parameters.im_batch_creator_maj()
+                    X_maj = np.concatenate((image_batch_maj, generated_images))
+                    y_dis_maj=np.ones(2* parameters.batch_size)+1
+                    y_dis_maj[: parameters.batch_size]=0
+                    #Pre train discriminator_maj on  fake and real data  before starting the gan.
+                    self.discriminator_maj.trainable = True
+                    _d_loss_maj = self.discriminator_maj.train_on_batch(X_maj, y_dis_maj)
+
+                #Tricking the noised input of the Generator as real data
+                noise = self.noise.noise(image_batch, parameters.batch_size)
+
+                y_gen_min = np.ones(parameters.batch_size)
+
+                # During the training of gan,
+                # the weights of discriminator should be fixed.
+                #We can enforce that by setting the trainable flag
+                self.discriminator_min.trainable = False
+                self.discriminator_maj.trainable = False
+
+                #training  the GAN by alternating the training of the Discriminator
+                #and training the chained GAN model with Discriminator’s weights freezed.
+                _g_loss_min = self.gan.train_on_batch(noise, [y_gen_min, y_gen_min])
+
+
+    def genFeat(self, parameters):
+        im_batch = parameters.im_batch_creator_min()
+        noise = self.noise.noise(im_batch, parameters.batch_size)
+        return self.generator.predict(noise)
+
+
+    def predict(self, data):
+        y_pred = self.discriminator_maj.predict(data)
+        return np.reshape(y_pred, len(data))
+
+
+
+class TrainTestData:
+    """
+    Stores features, data and labels for class 0 and class 1.
+    """
+
+    def __init__(self, features0, features1, trainFactor=0.9):
+        self.nFeatures0 = len(features0)
+        self.nFeatures1 = len(features1)
+
+        self.features_0_trn, self.features_0_tst = self.splitUpData(features0, trainFactor)
+        self.features_1_trn, self.features_1_tst = self.splitUpData(features1, trainFactor)
+
+        self.testData, self.testLabels = self.joinData(self.features_1_tst, self.features_0_tst)
+        self.trainData, self.trainLabels = self.joinData(self.features_1_trn, self.features_0_trn)
+
+    def splitUpData(self, data, trainFactor=0.9):
+        size = len(data)
+        trainSize = math.ceil(size * trainFactor)
+        trn = data[list(range(0, trainSize))]
+        tst = data[list(range(trainSize, size))]
+        return trn, tst
+
+    def joinData(self, data0, data1):
+        data = np.concatenate((data1, data0))
+        labels = np.concatenate(( np.zeros(len(data1)) + 1, np.zeros(len(data0)) ))
+        return data, labels
+
+
+
+if __name__ == "__main__":
+    def createTrainParameters():
+        data = fetch_datasets()['yeast_me2']
+        labels = data.target
+        features = data.data
+        label_1 = list(np.where(labels == 1)[0])
+        label_0 = list(np.where(labels == -1)[0])
+        features_1 = features[label_1]
+        features_0 = features[label_0]
+        features_1_trn = features_1[list(range(0,math.ceil(len(features_1)*2/3)))]
+        data_embedded_min = TSNE(perplexity=.1).fit_transform(features_1_trn)
+
+        result_min = pd.DataFrame(data=data_embedded_min, columns=['t-SNE0', 't-SNE1'])
+        min_t = np.asmatrix(result_min)
+        min_t = min_t[0:len(features_1_trn)]
+        min_t = min_t[:, [0,1]]
+
+        return GanTrainParameters(
+            n_feat=len(features[1]),
+            batch_size=30,
+            min_t=min_t,
+            features_0_trn=features_0[list(range(0,math.ceil(len(features_0)*2/3)))],
+            features_1_trn=features_1_trn
+            )
+
+
+    gtp = createTrainParameters()
+
+    cGan = GAN(n_feat=gtp.n_feat)
+    cGan.train(parameters=gtp)