import numpy as np from library.interfaces import GanBaseClass from library.dataset import DataSet from sklearn.decomposition import PCA from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score from sklearn.metrics import cohen_kappa_score from sklearn.metrics import precision_score from sklearn.metrics import recall_score from sklearn.neighbors import NearestNeighbors from sklearn.utils import shuffle from imblearn.datasets import fetch_datasets from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape from keras.models import Model from keras import backend as K from tqdm import tqdm import tensorflow as tf from tensorflow.keras.optimizers import Adam from tensorflow.keras.layers import Lambda from library.NNSearch import NNSearch import warnings warnings.filterwarnings("ignore") def newDense(size, activation="softsign"): initializer = tf.keras.initializers.RandomUniform(minval=0.00001, maxval=float(size)) #initializer = "glorot_uniform" return Dense(int(size) , activation=activation #, kernel_initializer=initializer , bias_initializer=initializer ) class Autoencoder(GanBaseClass): """ This is a toy example of a GAN. It repeats the first point of the training-data-set. """ def __init__(self, n_feat, middleSize=4, eps=0.0001, debug=True): self.isTrained = False self.n_feat = n_feat self.middleSize = middleSize self.eps = eps self.debug = debug self.dataSet = None self.decoder = None self.encoder = None self.autoencoder = None self.cg = None self.scaler = 1.0 self.lossFn = "mse" self.lossFn = "mean_squared_logarithmic_error" def reset(self): """ Resets the trained GAN to an random state. """ self.isTrained = False self.scaler = 1.0 ## instanciate discriminator network and visualize architecture self.encoder = self._createEncoder() ## instanciate generator network and visualize architecture self.decoder = self._createDecoder() ## instanciate network and visualize architecture self.autoencoder = self._createAutoencoder(self.encoder, self.decoder) def train(self, dataSet): """ Trains the GAN. It stores the data points in the training data set and mark as trained. *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset. We are only interested in the first *maxListSize* points in class 1. """ if dataSet.data1.shape[0] <= 0: raise AttributeError("Train: Expected data class 1 to contain at least one point.") d = dataSet.data1 self.data1 = d self.scaler = 1.1 * tf.reduce_max(tf.abs(d)).numpy() scaleDown = 1.0 / self.scaler lastLoss = 0.0 print(f"scaler: {self.scaler}") for epoch in range(100): h = self.autoencoder.fit(d, scaleDown * d, epochs=10, shuffle=True) print(str(d[0]) + " →") print(self.scaler * self.autoencoder.predict(np.array([d[0]]))) loss = h.history["loss"][-1] if loss < self.eps: print(f"done in {epoch} rounds") break if epoch == 0: lastLoss = loss else: print(f"Loss: {lastLoss} → {loss}") if abs(lastLoss - loss) < (0.1 * self.eps) and epoch > 10: print(f"converged in {epoch} rounds") break else: lastLoss = loss code = self.encoder.predict(d) center = np.zeros(self.middleSize) for c in code: center = center + c center = (1.0 / float(d.shape[0])) * center d = 0.0 for c in code: d = max(d, tf.reduce_max(tf.abs(c - center)).numpy()) self.noise = (center, d) self.isTrained = True def generateDataPoint(self): """ Returns one synthetic data point by repeating the stored list. """ return (self.generateData(1))[0] def generateData(self, numOfSamples=1): """ Generates a list of synthetic data-points. *numOfSamples* is a integer > 0. It gives the number of new generated samples. """ if not self.isTrained: raise ValueError("Try to generate data with untrained Re.") noise = self.noise[0] + np.random.normal(0.0, self.noise[1], [numOfSamples, self.middleSize]) syntheticPoints = self.decoder.predict(noise) # syntheticPoints = [] # while len(syntheticPoints) < numOfSamples: # nRest = max(0, numOfSamples - len(syntheticPoints)) # nBatch = min(nRest, len(self.data1)) # syntheticPoints.extend(self.autoencoder.predict(self.data1[:nBatch])) return self.scaler * np.array(syntheticPoints) # ############################################################### # Hidden internal functions # ############################################################### # Creating the GAN def _createEncoder(self): """ the generator network to generate synthetic samples from the convex space of arbitrary minority neighbourhoods """ ## takes minority batch as input dataIn = Input(shape=(self.n_feat,)) x = dataIn ## n = self.n_feat // 2 #x = newDense(max(n, self.middleSize))(x) x = newDense(self.n_feat)(x) x = newDense(self.middleSize)(x) model = Model(inputs=dataIn, outputs=x) opt = Adam(learning_rate=0.01) model.compile(loss='mean_squared_logarithmic_error', optimizer=opt) print("encoder") model.summary() return model def _createDecoder(self): """ the generator network to generate synthetic samples from the convex space of arbitrary minority neighbourhoods """ ## takes minority batch as input dataIn = Input(shape=(self.middleSize,)) x = dataIn ## n = self.n_feat // 2 x = newDense(max(n, self.middleSize))(x) #x = newDense(self.n_feat)(x) x = newDense(self.n_feat)(x) model = Model(inputs=dataIn, outputs=x) opt = Adam(learning_rate=0.01) model.compile(loss='mean_squared_logarithmic_error', optimizer=opt) print("decoder") model.summary() return model def _createAutoencoder(self, encoder, decoder): """ for joining the generator and the discriminator conv_coeff_generator-> generator network instance maj_min_discriminator -> discriminator network instance """ #encoder.trainable = False ## input receives a neighbourhood minority batch ## and a proximal majority batch concatenated dataIn = Input(shape=(self.n_feat,)) #x = newDense(self.middleSize)(dataIn) #x = newDense(self.n_feat)(x) #x = newDense(self.n_feat)(x) x = encoder(dataIn ) x = decoder(x) ## note that, the discriminator will not be traied but will make decisions based ## on its previous training while using this function model = Model(inputs=dataIn, outputs=x) opt = Adam(learning_rate=0.01) model.compile(loss=self.lossFn, optimizer=opt) print("autoencoder") model.summary() return model