| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639 |
- import numpy as np
- import matplotlib.pyplot as plt
- from library.interfaces import GanBaseClass
- from library.dataset import DataSet
- from library.timing import timing
- from keras.layers import Dense, Input, Multiply, Flatten, Conv1D, Reshape, InputLayer, Add
- from keras.models import Model, Sequential
- from keras import backend as K
- #from tqdm import tqdm
- import tensorflow as tf
- from tensorflow.keras.optimizers import Adam
- from tensorflow.keras.layers import Lambda
- import tensorflow_probability as tfp
- from sklearn.utils import shuffle
- from library.NNSearch import NNSearch, randomIndices
- import warnings
- warnings.filterwarnings("ignore")
- def repeat(x, times):
- return [x for _i in range(times)]
- def create01Labels(totalSize, sizeFirstHalf):
- labels = repeat(np.array([1,0]), sizeFirstHalf)
- labels.extend(repeat(np.array([0,1]), totalSize - sizeFirstHalf))
- return np.array(labels)
- class GeneratorConfig:
- def __init__(self, n_feat=None, neb=5, gen=None, neb_epochs=10, genLayerSizes=None, genAddNoise=True):
- self.n_feat = n_feat
- self.neb = neb
- self.gen = gen
- self.neb_epochs = neb_epochs
- self.genAddNoise = genAddNoise
- self.genLayerSizes = genLayerSizes
- def isConfigMissing(self):
- return any( x is None for x in
- [ self.n_feat
- , self.neb
- , self.gen
- , self.genAddNoise
- , self.genLayerSizes
- , self.neb_epochs
- ])
- def checkForValidConfig(self):
- if self.isConfigMissing():
- raise ValueError(f"Some configuration is missing.")
- if self.neb > self.gen:
- raise ValueError(f"Expected neb <= gen but got neb={self.neb} and gen={self.gen}.")
- if sum(self.genLayerSizes) != self.gen:
- raise ValueError(f"Expected the layer sizes to sum up to gen={self.gen}.")
- return True
- def fixMissingValuesByInputData(self, data):
- config = GeneratorConfig()
- config.neb = self.neb
- config.gen = self.gen
- config.genAddNoise = self.genAddNoise
- config.genLayerSizes = self.genLayerSizes
-
- if data is not None:
- if config.n_feat is None:
- config.n_feat = data.shape[1]
- if config.neb is None:
- config.neb = data.shape[0]
- else:
- config.neb = min(config.neb, data.shape[0])
- if config.gen is None:
- config.gen = config.neb
- if config.genLayerSizes is None:
- config.genLayerSizes = [config.gen]
- return config
- def nebShape(self, aboveSize=None):
- if aboveSize is None:
- return (self.neb, self.n_feat)
- else:
- return (aboveSize, self.neb, self.n_feat)
- def genShape(self, aboveSize=None):
- if aboveSize is None:
- return (self.gen, self.n_feat)
- else:
- return (aboveSize, self.gen, self.n_feat)
- class XConvGeN(GanBaseClass):
- """
- This is the ConvGeN class. ConvGeN is a synthetic point generator for imbalanced datasets.
- """
- def __init__(self, config=None, fdc=None, debug=False):
- self.isTrained = False
- self.config = config
- self.defaultConfig = config
- self.loss_history = None
- self.debug = debug
- self.minSetSize = 0
- self.conv_sample_generator = None
- self.maj_min_discriminator = None
- self.cg = None
- self.canPredict = True
- self.fdc = fdc
- self.lastProgress = -1
-
- self.timing = { n: timing(n) for n in [
- "Train", "BMB", "NbhSearch", "NBH", "GenSamples", "Fit", "FixType"
- ] }
- if not self.config.isConfigMissing():
- self.config.checkForValidConfig()
- def reset(self, data):
- """
- Creates the network.
- *dataSet* is a instance of /library.dataset.DataSet/ or None.
- It contains the training dataset.
- It is used to determine the neighbourhood size if /neb/ in /__init__/ was None.
- """
- self.isTrained = False
- self.config = self.defaultConfig.fixMissingValuesByInputData(data)
- self.config.checkForValidConfig()
- ## instanciate generator network and visualize architecture
- self.conv_sample_generator = self._conv_sample_gen()
- ## instanciate discriminator network and visualize architecture
- self.maj_min_discriminator = self._maj_min_disc()
- ## instanciate network and visualize architecture
- self.cg = self._convGeN(self.conv_sample_generator, self.maj_min_discriminator)
- self.lastProgress = (-1,-1,-1)
- if self.debug:
- print(f"neb={self.config.neb}, gen={self.config.gen}")
- print(self.conv_sample_generator.summary())
- print('\n')
-
- print(self.maj_min_discriminator.summary())
- print('\n')
- print(self.cg.summary())
- print('\n')
- def train(self, data, discTrainCount=5, batchSize=32):
- """
- Trains the Network.
- *dataSet* is a instance of /library.dataset.DataSet/. It contains the training dataset.
-
- *discTrainCount* gives the number of extra training for the discriminator for each epoch. (>= 0)
- """
- if data.shape[0] <= 0:
- raise AttributeError("Train: Expected data class 1 to contain at least one point.")
- self.timing["Train"].start()
- # Store size of minority class. This is needed during point generation.
- self.minSetSize = data.shape[0]
- normalizedData = data
- if self.fdc is not None:
- normalizedData = self.fdc.normalize(data)
-
- self.timing["NbhSearch"].start()
- # Precalculate neighborhoods
- self.nmbMin = NNSearch(self.config.neb).fit(haystack=normalizedData)
- self.nmbMin.basePoints = np.array([ [x.astype(np.float32) for x in p] for p in data])
- self.timing["NbhSearch"].stop()
- # Do the training.
- self._rough_learning(data, discTrainCount, batchSize=batchSize)
-
- # Neighborhood in majority class is no longer needed. So save memory.
- self.isTrained = True
- self.timing["Train"].stop()
- def generateDataPoint(self):
- """
- Returns one synthetic data point by repeating the stored list.
- """
- return (self.generateData(1))[0]
- def generateData(self, numOfSamples=1):
- """
- Generates a list of synthetic data-points.
- *numOfSamples* is a integer > 0. It gives the number of new generated samples.
- """
- if not self.isTrained:
- raise ValueError("Try to generate data with untrained network.")
- ## roughly claculate the upper bound of the synthetic samples to be generated from each neighbourhood
- synth_num = (numOfSamples // self.minSetSize) + 1
- runs = (synth_num // self.config.gen) + 1
- ## Get a random list of all indices
- indices = randomIndices(self.minSetSize)
- ## generate all neighborhoods
- def neighborhoodGenerator():
- for index in indices:
- yield self.nmbMin.getNbhPointsOfItem(index)
- neighborhoods = (tf.data.Dataset
- .from_generator(neighborhoodGenerator, output_types=tf.float32)
- .repeat()
- )
- batch = neighborhoods.take(runs * self.minSetSize)
- synth_batch = self.conv_sample_generator.predict(batch.batch(32), verbose=0)
- pairs = tf.data.Dataset.zip(
- ( batch
- , tf.data.Dataset.from_tensor_slices(synth_batch)
- ))
- corrected = pairs.map(self.correct_feature_types())
- ## extract the exact number of synthetic samples needed to exactly balance the two classes
- r = np.concatenate(np.array(list(corrected.take(1 + (numOfSamples // self.config.gen)))), axis=0)[:numOfSamples]
- return r
- def predictReal(self, data):
- """
- Uses the discriminator on data.
-
- *data* is a numpy array of shape (n, n_feat) where n is the number of datapoints and n_feat the number of features.
- """
- prediction = self.maj_min_discriminator.predict(data)
- return np.array([x[0] for x in prediction])
- # ###############################################################
- # Hidden internal functions
- # ###############################################################
- # Creating the Network: Generator
- def _conv_sample_gen(self):
- """
- The generator network to generate synthetic samples from the convex space
- of arbitrary minority neighbourhoods
- """
- n_feat = self.config.n_feat
- neb = self.config.neb
- gen = self.config.gen
- genLayerSizes = self.config.genLayerSizes
- ## takes minority batch as input
- min_neb_batch = Input(shape=(neb, n_feat))
- ## using 1-D convolution, feature dimension remains the same
- x = Conv1D(n_feat, 3, activation='relu', name="UnsharpenInput")(min_neb_batch)
- ## flatten after convolution
- x = Flatten(name="InputMatrixToVector")(x)
- synth = []
- n = 0
- if sum(genLayerSizes) < gen:
- genLayerSizes.append(gen)
- for layerSize in genLayerSizes:
- w = min(layerSize, gen - n)
- if w <= 0:
- break
- n += w
-
- ## add dense layer to transform the vector to a convenient dimension
- y = Dense(neb * w, activation='relu', name=f"P{n}_dense")(x)
- ## again, witching to 2-D tensor once we have the convenient shape
- y = Reshape((neb, w), name=f"P{n}_reshape")(y)
- ## column wise sum
- s = K.sum(y, axis=1)
- ## adding a small constant to always ensure the column sums are non zero.
- ## if this is not done then during initialization the sum can be zero.
- s_non_zero = Lambda(lambda x: x + .000001, name=f"P{n}_make_non_zero")(s)
- ## reprocals of the approximated column sum
- sinv = tf.math.reciprocal(s_non_zero, name=f"P{n}_invert")
- ## At this step we ensure that column sum is 1 for every row in x.
- ## That means, each column is set of convex co-efficient
- y = Multiply(name=f"P{n}_normalize")([sinv, y])
- ## Now we transpose the matrix. So each row is now a set of convex coefficients
- aff = tf.transpose(y[0], name=f"P{n}_transpose")
- ## We now do matrix multiplication of the affine combinations with the original
- ## minority batch taken as input. This generates a convex transformation
- ## of the input minority batch
- y = tf.matmul(aff, min_neb_batch, name=f"P{n}_project")
- synth.append(y)
- synth = tf.concat(synth, axis=1, name="collect_planes")
- nOut = gen * n_feat
- if self.config.genAddNoise:
- noiseGenerator = Sequential([
- InputLayer(input_shape=(gen, n_feat)),
- Flatten(),
- Dense(tfp.layers.IndependentNormal.params_size(nOut)),
- tfp.layers.IndependentNormal(nOut)
- ], name="RandomNoise")
- noise = noiseGenerator(synth)
- noise = Reshape((gen, n_feat), name="ReshapeNoise")(noise)
- synth = Add(name="AddNoise")([synth, noise])
- ## finally we compile the generator with an arbitrary minortiy neighbourhood batch
- ## as input and a covex space transformation of the same number of samples as output
- model = Model(inputs=min_neb_batch, outputs=synth)
- opt = Adam(learning_rate=0.001)
- model.compile(loss='mean_squared_logarithmic_error', optimizer=opt)
- return model
- # Creating the Network: discriminator
- def _maj_min_disc(self):
- """
- the discriminator is trained in two phase:
- first phase: while training ConvGeN the discriminator learns to differentiate synthetic
- minority samples generated from convex minority data space against
- the borderline majority samples
- second phase: after the ConvGeN generator learns to create synthetic samples,
- it can be used to generate synthetic samples to balance the dataset
- and then rettrain the discriminator with the balanced dataset
- """
- ## takes as input synthetic sample generated as input stacked upon a batch of
- ## borderline majority samples
- samples = Input(shape=(self.config.n_feat,))
-
- ## passed through two dense layers
- y = Dense(250, activation='relu')(samples)
- y = Dense(125, activation='relu')(y)
- y = Dense(75, activation='relu')(y)
-
- ## two output nodes. outputs have to be one-hot coded (see labels variable before)
- output = Dense(2, activation='sigmoid')(y)
-
- ## compile model
- model = Model(inputs=samples, outputs=output)
- opt = Adam(learning_rate=0.0001)
- model.compile(loss='binary_crossentropy', optimizer=opt)
- return model
- # Creating the Network: ConvGeN
- def _convGeN(self, generator, discriminator):
- """
- for joining the generator and the discriminator
- conv_coeff_generator-> generator network instance
- maj_min_discriminator -> discriminator network instance
- """
- n_feat = self.config.n_feat
- neb = self.config.neb
- gen = self.config.gen
- ## by default the discriminator trainability is switched off.
- ## Thus training ConvGeN means training the generator network as per previously
- ## trained discriminator network.
- discriminator.trainable = False
- # Shape of data: (batchSize, 2, gen, n_feat)
- # Shape of labels: (batchSize, 2 * gen, 2)
- ## input receives a neighbourhood minority batch
- ## and a proximal majority batch concatenated
- batch_data = Input(shape=(2, gen, n_feat))
- # batch_data: (batchSize, 2, gen, n_feat)
-
- ## extract minority batch
- min_batch = Lambda(lambda x: x[:, 0, : ,:], name="SplitForGen")(batch_data)
- # min_batch: (batchSize, gen, n_feat)
-
- ## extract majority batch
- maj_batch = Lambda(lambda x: x[:, 1, :, :], name="SplitForDisc")(batch_data)
- # maj_batch: (batchSize, gen, n_feat)
- maj_batch = tf.reshape(maj_batch, (-1, n_feat), name="ReshapeForDisc")
- # maj_batch: (batchSize * gen, n_feat)
-
- ## pass minority batch into generator to obtain convex space transformation
- ## (synthetic samples) of the minority neighbourhood input batch
- conv_samples = generator(min_batch)
- # conv_batch: (batchSize, gen, n_feat)
- conv_samples = tf.reshape(conv_samples, (-1, n_feat), name="ReshapeGenOutput")
- # conv_batch: (batchSize * gen, n_feat)
- ## pass samples into the discriminator to know its decisions
- conv_samples = discriminator(conv_samples)
- conv_samples = tf.reshape(conv_samples, (-1, gen, 2), name="ReshapeGenDiscOutput")
- # conv_batch: (batchSize * gen, 2)
- maj_batch = discriminator(maj_batch)
- maj_batch = tf.reshape(maj_batch, (-1, gen, 2), name="ReshapeMajDiscOutput")
- # conv_batch: (batchSize * gen, 2)
-
- ## concatenate the decisions
- output = tf.concat([conv_samples, maj_batch],axis=1)
- # output: (batchSize, 2 * gen, 2)
-
- ## note that, the discriminator will not be traied but will make decisions based
- ## on its previous training while using this function
- model = Model(inputs=batch_data, outputs=output)
- opt = Adam(learning_rate=0.0001)
- model.compile(loss='mse', optimizer=opt)
- return model
- # Training
- def _rough_learning(self, data, discTrainCount, batchSize=32):
- n_feat = self.config.n_feat
- neb = self.config.neb
- gen = self.config.gen
- generator = self.conv_sample_generator
- discriminator = self.maj_min_discriminator
- convGeN = self.cg
- loss_history = [] ## this is for stroring the loss for every run
- minSetSize = len(data)
- ## Create labels for one neighborhood training.
- nLabels = 2 * gen
- labels = np.array(create01Labels(nLabels, gen))
- labelsGeN = np.array([labels])
- def getNeighborhoods():
- for index in range(self.minSetSize):
- yield indexToBatches(index)
- def indexToBatches(min_idx):
- self.timing["NBH"].start()
- ## generate minority neighbourhood batch for every minority class sampls by index
- min_batch_indices = self.nmbMin.neighbourhoodOfItem(min_idx)
- min_batch = self.nmbMin.getPointsFromIndices(min_batch_indices)
- ## generate random proximal majority batch
- maj_batch = self._BMB(min_batch_indices)
- self.timing["NBH"].stop()
- return (min_batch, maj_batch)
- def unbatch(parts):
- def fn():
- for part in parts:
- for neighborhood in part:
- for x in neighborhood:
- yield x
- return fn
- def genLabels():
- for min_idx in range(minSetSize):
- for x in labels:
- yield x
-
- padd = np.zeros((gen - neb, n_feat))
- discTrainCount = 1 + max(0, discTrainCount)
- for neb_epoch_count in range(self.config.neb_epochs):
- self.progressBar(neb_epoch_count / self.config.neb_epochs)
- ## Training of the discriminator.
- #
- # Get all neighborhoods and synthetic points as data stream.
- nbhPairs = tf.data.Dataset.from_generator(getNeighborhoods, output_types=tf.float32).repeat().take(discTrainCount * self.minSetSize)
- nbhMin = nbhPairs.map(lambda x: x[0])
- batchMaj = nbhPairs.map(lambda x: x[1])
- fnCt = self.correct_feature_types()
- synth_batch = self.conv_sample_generator.predict(nbhMin.batch(32), verbose=0)
- pairMinMaj = tf.data.Dataset.zip(
- ( nbhMin
- , tf.data.Dataset.from_tensor_slices(synth_batch)
- , batchMaj
- )).map(lambda x, y, z: [fnCt(x,y), z])
-
- a = tf.data.Dataset.from_generator(unbatch(pairMinMaj), output_types=tf.float32)
- # Get all labels as data stream.
- b = tf.data.Dataset.from_tensor_slices(labels).repeat()
- # Zip data and matching labels together for training.
- samples = tf.data.Dataset.zip((a, b)).batch(batchSize * 2 * gen)
- # train the discriminator with the concatenated samples and the one-hot encoded labels
- self.timing["Fit"].start()
- discriminator.trainable = True
- discriminator.fit(x=samples, verbose=0)
- discriminator.trainable = False
- self.timing["Fit"].stop()
- ## use the complete network to make the generator learn on the decisions
- ## made by the previous discriminator training
- #
- # Get all neighborhoods as data stream.
- a = (tf.data.Dataset
- .from_generator(getNeighborhoods, output_types=tf.float32)
- .map(lambda x: [[tf.concat([x[0], padd], axis=0), x[1]]]))
- # Get all labels as data stream.
- b = tf.data.Dataset.from_tensor_slices(labelsGeN).repeat()
- # Zip data and matching labels together for training.
- samples = tf.data.Dataset.zip((a, b)).batch(batchSize)
- # Train with the data stream. Store the loss for later usage.
- gen_loss_history = convGeN.fit(samples, verbose=0, batch_size=batchSize)
- loss_history.append(gen_loss_history.history['loss'])
- self.progressBar(1.0)
- ## When done: print some statistics.
- if self.debug:
- run_range = range(1, len(loss_history) + 1)
- plt.rcParams["figure.figsize"] = (16,10)
- plt.xticks(fontsize=20)
- plt.yticks(fontsize=20)
- plt.xlabel('runs', fontsize=25)
- plt.ylabel('loss', fontsize=25)
- plt.title('Rough learning loss for discriminator', fontsize=25)
- plt.plot(run_range, loss_history)
- plt.show()
- ## When done: print some statistics.
- self.loss_history = loss_history
- def _BMB(self, min_idxs):
- ## Generate a borderline majority batch
- ## data_maj -> majority class data
- ## min_idxs -> indices of points in minority class
- ## gen -> convex combinations generated from each neighbourhood
- self.timing["BMB"].start()
- indices = randomIndices(self.minSetSize, outputSize=self.config.gen, indicesToIgnore=min_idxs)
- r = self.nmbMin.basePoints[indices]
- self.timing["BMB"].stop()
- return r
- def retrainDiscriminitor(self, data, labels):
- self.maj_min_discriminator.trainable = True
- labels = np.array([ [x, 1 - x] for x in labels])
- self.maj_min_discriminator.fit(x=data, y=labels, batch_size=20, epochs=self.config.neb_epochs)
- self.maj_min_discriminator.trainable = False
- def progressBar(self, x):
- barWidth = 40
- x = int(x * barWidth)
- if self.lastProgress == x:
- return
-
- def bar(v):
- v = min(v, barWidth)
- r = ("=" * v) + (" " * (barWidth - v))
- return r
-
- print(f"[{bar(x)}]", end="\r")
-
- def correct_feature_types(self):
- # batch[0] = original points (gen x n_feat)
- # batch[1] = synthetic points (gen x n_feat)
-
- @tf.function
- def voidFunction(reference, synth):
- return synth
-
- if self.fdc is None:
- return voidFunction
-
- columns = set(self.fdc.nom_list or [])
- for y in (self.fdc.ord_list or []):
- columns.add(y)
- columns = list(columns)
-
- if len(columns) == 0:
- return voidFunction
-
- neb = self.config.neb
- n_feat = self.config.n_feat
- nn = tf.constant([(1.0 if x in columns else 0.0) for x in range(n_feat)])
- if n_feat is None:
- print("ERRROR n_feat is None")
- if nn is None:
- print("ERRROR nn is None")
- @tf.function
- def bestMatchOf(vi):
- value = vi[0]
- c = vi[1][0]
- r = vi[2]
- if c != 0.0:
- d = tf.abs(value - r)
- return r[tf.math.argmin(d)]
- else:
- return value[0]
-
- @tf.function
- def indexted(v, rt):
- vv = tf.reshape(tf.repeat([v], neb, axis=1), (n_feat, neb))
- vn = tf.reshape(tf.repeat([nn], neb, axis=1), (n_feat, neb))
- return tf.stack((vv, vn, rt), axis=1)
-
- @tf.function
- def correctVector(v, rt):
- return tf.map_fn(lambda x: bestMatchOf(x), indexted(v, rt))
- @tf.function
- def fn(reference, synth):
- rt = tf.transpose(reference)
- return tf.map_fn(lambda x: correctVector(x, rt), synth)
-
- return fn
|