Sfoglia il codice sorgente

Experimented with autoencoder.

Kristian Schultz 4 anni fa
parent
commit
aeddbcce05
1 ha cambiato i file con 241 aggiunte e 0 eliminazioni
  1. 241 0
      library/generators/autoencoder.py

+ 241 - 0
library/generators/autoencoder.py

@@ -0,0 +1,241 @@
+import numpy as np
+
+from library.interfaces import GanBaseClass
+from library.dataset import DataSet
+
+from sklearn.decomposition import PCA
+from sklearn.metrics import confusion_matrix
+from sklearn.metrics import f1_score
+from sklearn.metrics import cohen_kappa_score
+from sklearn.metrics import precision_score
+from sklearn.metrics import recall_score
+from sklearn.neighbors import NearestNeighbors
+from sklearn.utils import shuffle
+from imblearn.datasets import fetch_datasets
+
+from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape
+from keras.models import Model
+from keras import backend as K
+from tqdm import tqdm
+
+import tensorflow as tf
+from tensorflow.keras.optimizers import Adam
+from tensorflow.keras.layers import Lambda
+
+from library.NNSearch import NNSearch
+
+import warnings
+warnings.filterwarnings("ignore")
+
+
+def newDense(size, activation="softsign"):
+    initializer = tf.keras.initializers.RandomUniform(minval=0.00001, maxval=float(size))
+    #initializer = "glorot_uniform"
+
+    return Dense(int(size)
+        , activation=activation
+        #, kernel_initializer=initializer
+        , bias_initializer=initializer
+        )
+
+
+class Autoencoder(GanBaseClass):
+    """
+    This is a toy example of a GAN.
+    It repeats the first point of the training-data-set.
+    """
+    def __init__(self, n_feat, middleSize=4, eps=0.0001, debug=True):
+        self.isTrained = False
+        self.n_feat = n_feat
+        self.middleSize = middleSize
+        self.eps = eps
+        self.debug = debug
+        self.dataSet = None
+        self.decoder = None
+        self.encoder = None
+        self.autoencoder = None
+        self.cg = None
+        self.scaler = 1.0
+        self.lossFn = "mse"
+        self.lossFn = "mean_squared_logarithmic_error"
+
+    def reset(self):
+        """
+        Resets the trained GAN to an random state.
+        """
+        self.isTrained = False
+        self.scaler = 1.0
+        ## instanciate discriminator network and visualize architecture
+        self.encoder = self._createEncoder()
+
+        ## instanciate generator network and visualize architecture
+        self.decoder = self._createDecoder()
+
+        ## instanciate network and visualize architecture
+        self.autoencoder = self._createAutoencoder(self.encoder, self.decoder)
+
+    def train(self, dataSet):
+        """
+        Trains the GAN.
+
+        It stores the data points in the training data set and mark as trained.
+
+        *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset.
+        We are only interested in the first *maxListSize* points in class 1.
+        """
+        if dataSet.data1.shape[0] <= 0:
+            raise AttributeError("Train: Expected data class 1 to contain at least one point.")
+
+        d = dataSet.data1
+        self.data1 = d
+        self.scaler = 1.1 * tf.reduce_max(tf.abs(d)).numpy()
+        scaleDown = 1.0 / self.scaler
+
+        lastLoss = 0.0
+        print(f"scaler: {self.scaler}")
+
+        for epoch in range(100):
+            h = self.autoencoder.fit(d, scaleDown * d, epochs=10, shuffle=True)
+            print(str(d[0]) + " →")
+            print(self.scaler * self.autoencoder.predict(np.array([d[0]])))
+            loss = h.history["loss"][-1]
+            if loss < self.eps:
+                print(f"done in {epoch} rounds")
+                break
+
+            if epoch == 0:
+                lastLoss = loss
+            else:
+                print(f"Loss: {lastLoss} → {loss}")
+                if abs(lastLoss - loss) < (0.1 * self.eps) and epoch > 10:
+                    print(f"converged in {epoch} rounds")
+                    break
+                else:
+                    lastLoss = loss
+
+
+        code = self.encoder.predict(d)
+        center = np.zeros(self.middleSize)
+        for c in code:
+            center = center + c
+        center = (1.0 / float(d.shape[0])) * center
+
+        d = 0.0
+        for c in code:
+            d = max(d, tf.reduce_max(tf.abs(c - center)).numpy())
+
+        self.noise = (center, d)
+
+        self.isTrained = True
+
+    def generateDataPoint(self):
+        """
+        Returns one synthetic data point by repeating the stored list.
+        """
+        return (self.generateData(1))[0]
+
+
+    def generateData(self, numOfSamples=1):
+        """
+        Generates a list of synthetic data-points.
+
+        *numOfSamples* is a integer > 0. It gives the number of new generated samples.
+        """
+        if not self.isTrained:
+            raise ValueError("Try to generate data with untrained Re.")
+
+        noise = self.noise[0] + np.random.normal(0.0, self.noise[1], [numOfSamples, self.middleSize])
+        syntheticPoints = self.decoder.predict(noise)
+        
+        # syntheticPoints = []
+        # while len(syntheticPoints) < numOfSamples:
+        #     nRest = max(0, numOfSamples - len(syntheticPoints))
+        #     nBatch = min(nRest, len(self.data1))
+        #     syntheticPoints.extend(self.autoencoder.predict(self.data1[:nBatch]))
+
+        return self.scaler * np.array(syntheticPoints)
+
+    # ###############################################################
+    # Hidden internal functions
+    # ###############################################################
+
+    # Creating the GAN
+    def _createEncoder(self):
+        """
+        the generator network to generate synthetic samples from the convex space
+        of arbitrary minority neighbourhoods
+        """
+
+        ## takes minority batch as input
+        dataIn = Input(shape=(self.n_feat,))
+        x = dataIn
+
+        ## 
+        n = self.n_feat // 2
+        #x = newDense(max(n, self.middleSize))(x)
+        x = newDense(self.n_feat)(x)
+
+        x = newDense(self.middleSize)(x)
+
+        model = Model(inputs=dataIn, outputs=x)
+        opt = Adam(learning_rate=0.01)
+        model.compile(loss='mean_squared_logarithmic_error', optimizer=opt)
+
+        print("encoder")
+        model.summary()
+        return model
+
+    def _createDecoder(self):
+        """
+        the generator network to generate synthetic samples from the convex space
+        of arbitrary minority neighbourhoods
+        """
+
+        ## takes minority batch as input
+        dataIn = Input(shape=(self.middleSize,))
+        x = dataIn
+
+        ## 
+        n = self.n_feat // 2
+        x = newDense(max(n, self.middleSize))(x)
+
+        #x = newDense(self.n_feat)(x)
+        x = newDense(self.n_feat)(x)
+
+        model = Model(inputs=dataIn, outputs=x)
+        opt = Adam(learning_rate=0.01)
+        model.compile(loss='mean_squared_logarithmic_error', optimizer=opt)
+
+        print("decoder")
+        model.summary()
+        return model
+
+
+    def _createAutoencoder(self, encoder, decoder):
+        """
+        for joining the generator and the discriminator
+        conv_coeff_generator-> generator network instance
+        maj_min_discriminator -> discriminator network instance
+        """
+
+        #encoder.trainable = False
+        ## input receives a neighbourhood minority batch
+        ## and a proximal majority batch concatenated
+        dataIn = Input(shape=(self.n_feat,))
+        #x = newDense(self.middleSize)(dataIn)
+        #x = newDense(self.n_feat)(x)
+        #x = newDense(self.n_feat)(x)
+        
+        x = encoder(dataIn )
+        x = decoder(x)
+
+        ## note that, the discriminator will not be traied but will make decisions based
+        ## on its previous training while using this function
+        model = Model(inputs=dataIn, outputs=x)
+        opt = Adam(learning_rate=0.01)
+        model.compile(loss=self.lossFn, optimizer=opt)
+
+        print("autoencoder")
+        model.summary()
+        return model
+