| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import numpy as np
- from library.interfaces import GanBaseClass
- from library.dataset import DataSet
- from sklearn.decomposition import PCA
- from sklearn.metrics import confusion_matrix
- from sklearn.metrics import f1_score
- from sklearn.metrics import cohen_kappa_score
- from sklearn.metrics import precision_score
- from sklearn.metrics import recall_score
- from sklearn.neighbors import NearestNeighbors
- from sklearn.utils import shuffle
- from imblearn.datasets import fetch_datasets
- from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape
- from keras.models import Model
- from keras import backend as K
- from tqdm import tqdm
- import tensorflow as tf
- from tensorflow.keras.optimizers import Adam
- from tensorflow.keras.layers import Lambda
- from library.NNSearch import NNSearch
- import warnings
- warnings.filterwarnings("ignore")
- lossFunction = "mean_squared_logarithmic_error"
- #lossFunction = "mse"
- def newDense(size, activation="relu"): # softsign
- initializer = tf.keras.initializers.RandomUniform(minval=0.00001, maxval=float(size))
- initializer = "glorot_uniform"
- return Dense(int(size)
- , activation=activation
- , kernel_initializer=initializer
- , bias_initializer=initializer
- )
- class Autoencoder(GanBaseClass):
- """
- This is a toy example of a GAN.
- It repeats the first point of the training-data-set.
- """
- def __init__(self, n_feat, middleSize=4, eps=0.0001, debug=True):
- self.isTrained = False
- self.n_feat = n_feat
- self.middleSize = middleSize
- self.eps = eps
- self.debug = debug
- self.dataSet = None
- self.decoder = None
- self.encoder = None
- self.autoencoder = None
- self.cg = None
- self.scaler = 1.0
- self.lossFn = lossFunction #"mse"
- self.lossFn = "mean_squared_logarithmic_error"
- def reset(self, _dataSet):
- """
- Resets the trained GAN to an random state.
- """
- self.isTrained = False
- self.scaler = 1.0
- ## instanciate discriminator network and visualize architecture
- self.encoder = self._createEncoder()
- ## instanciate generator network and visualize architecture
- self.decoder = self._createDecoder()
- ## instanciate network and visualize architecture
- self.autoencoder = self._createAutoencoder(self.encoder, self.decoder)
- def train(self, dataSet):
- """
- Trains the GAN.
- It stores the data points in the training data set and mark as trained.
- *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset.
- We are only interested in the first *maxListSize* points in class 1.
- """
- if dataSet.data1.shape[0] <= 0:
- raise AttributeError("Train: Expected data class 1 to contain at least one point.")
- d = dataSet.data1
- self.data1 = d
- self.scaler = 1.5 * tf.reduce_max(tf.abs(d)).numpy()
- scaleDown = 1.0 / self.scaler
- lastLoss = 0.0
- print(f"scaler: {self.scaler}")
- dScaled = scaleDown * d
-
- for epoch in range(1000):
- h = self.autoencoder.fit(d, dScaled, epochs=1, shuffle=True)
- #print(str(d[0]) + " →")
- #print(self.scaler * self.autoencoder.predict(np.array([d[0]])))
- loss = h.history["loss"][-1]
- if loss < self.eps:
- print(f"done in {epoch} rounds")
- break
- if epoch == 0:
- lastLoss = loss
- else:
- print(f"Loss: {lastLoss} → {loss}")
- if abs(lastLoss - loss) < (0.1 * self.eps) and epoch > 10:
- print(f"converged in {epoch} rounds")
- break
- else:
- lastLoss = loss
- code = self.encoder.predict(d)
- center = np.zeros(self.middleSize)
- for c in code:
- center = center + c
- center = (1.0 / float(d.shape[0])) * center
- d = 0.0
- for c in code:
- d = max(d, tf.reduce_max(tf.abs(c - center)).numpy())
- self.noise = (center, d)
- self.isTrained = True
- def generateDataPoint(self):
- """
- Returns one synthetic data point by repeating the stored list.
- """
- return (self.generateData(1))[0]
- def generateData(self, numOfSamples=1):
- """
- Generates a list of synthetic data-points.
- *numOfSamples* is a integer > 0. It gives the number of new generated samples.
- """
- if not self.isTrained:
- raise ValueError("Try to generate data with untrained Re.")
- noise = self.noise[0] + np.random.normal(0.0, self.noise[1], [numOfSamples, self.middleSize])
- syntheticPoints = self.decoder.predict(noise)
-
- # syntheticPoints = []
- # while len(syntheticPoints) < numOfSamples:
- # nRest = max(0, numOfSamples - len(syntheticPoints))
- # nBatch = min(nRest, len(self.data1))
- # syntheticPoints.extend(self.autoencoder.predict(self.data1[:nBatch]))
- return self.scaler * np.array(syntheticPoints)
- # ###############################################################
- # Hidden internal functions
- # ###############################################################
- # Creating the GAN
- def _createEncoder(self):
- """
- the generator network to generate synthetic samples from the convex space
- of arbitrary minority neighbourhoods
- """
- ## takes minority batch as input
- dataIn = Input(shape=(self.n_feat,))
- x = dataIn
- x = newDense(self.n_feat)(x)
- ##
- n = self.n_feat // 2
- x = newDense(max(n, self.middleSize))(x)
- x = newDense(self.middleSize)(x)
- model = Model(inputs=dataIn, outputs=x)
- opt = Adam(learning_rate=0.01)
- model.compile(loss=lossFunction, optimizer=opt)
- print("encoder")
- model.summary()
- return model
- def _createDecoder(self):
- """
- the generator network to generate synthetic samples from the convex space
- of arbitrary minority neighbourhoods
- """
- ## takes minority batch as input
- dataIn = Input(shape=(self.middleSize,))
- x = dataIn
- ##
- n = self.n_feat // 2
- #x = newDense(max(n, self.middleSize))(x)
- #x = newDense(self.n_feat)(x)
- x = newDense(self.n_feat)(x)
- model = Model(inputs=dataIn, outputs=x)
- opt = Adam(learning_rate=0.01)
- model.compile(loss=lossFunction, optimizer=opt)
- print("decoder")
- model.summary()
- return model
- def _createAutoencoder(self, encoder, decoder):
- """
- for joining the generator and the discriminator
- conv_coeff_generator-> generator network instance
- maj_min_discriminator -> discriminator network instance
- """
- #encoder.trainable = False
- ## input receives a neighbourhood minority batch
- ## and a proximal majority batch concatenated
- dataIn = Input(shape=(self.n_feat,))
- #x = newDense(self.middleSize)(dataIn)
- #x = newDense(self.n_feat)(x)
- #x = newDense(self.n_feat)(x)
-
- x = encoder(dataIn )
- x = decoder(x)
- ## note that, the discriminator will not be traied but will make decisions based
- ## on its previous training while using this function
- model = Model(inputs=dataIn, outputs=x)
- opt = Adam(learning_rate=0.01)
- model.compile(loss=self.lossFn, optimizer=opt)
- print("autoencoder")
- model.summary()
- return model
|