| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- import math
- import numpy as np
- from tqdm import tqdm
- from keras.layers import Dense, Dropout, Input
- from keras.models import Model,Sequential
- from keras.layers.advanced_activations import LeakyReLU
- from keras.optimizers import Adam
- from sklearn.neighbors import NearestNeighbors
- def adam_optimizer():
- return Adam(lr=0.0002, beta_1=0.5)
- def Neb_grps(data, near_neb):
- nbrs = NearestNeighbors(n_neighbors=near_neb, algorithm='ball_tree').fit(data)
- _distances, indices = nbrs.kneighbors(data)
- neb_class = list(indices)
- return np.asarray(neb_class)
- class GanTrainParameters:
- """
- Parameters for Training the GAN Network.
- """
- def __init__(self, n_feat, batch_size, min_t, features_0_trn, features_1_trn):
- self.batch_size = batch_size
- self.n_feat = n_feat
- self.min_t = min_t
- self.features_0_trn = features_0_trn
- self.features_1_trn = features_1_trn
- def im_batch_creator_min(self):
- nbd = Neb_grps(self.min_t, self.batch_size)
- rand = np.random.randint(low=0, high=self.features_1_trn.shape[0], size=1)
- idx = tuple(list(nbd[rand]))
- image_batch = self.features_1_trn[idx]
- return image_batch
- def im_batch_creator_maj(self):
- rand = np.random.randint(low=0, high=self.features_0_trn.shape[0], size=self.batch_size)
- image_batch = np.reshape(self.features_0_trn[rand[:,None]], (self.batch_size, self.n_feat))
- return image_batch
- class TLoRasNoise:
- """
- Noise function
- """
- def __init__(self, shadow=50, sigma=.005, num_afcomb=7):
- self.shadow = shadow
- self.sigma = sigma
- self.num_afcomb = num_afcomb
- def tLoRAS(self, data, num_samples, num_RACOS):
- np.random.seed(42)
- data_shadow = np.asarray([
- d + np.random.normal(0, self.sigma)
- for d in data[:num_samples]
- for _c in range(self.shadow)
- ])
- return np.asarray([
- self.shadowLcDataPoint(num_samples, data_shadow)
- for _i in range(num_RACOS)
- ])
- def shadowLcDataPoint(self, num_samples, data_shadow):
- idx = np.random.randint(self.shadow * num_samples, size=self.num_afcomb)
- w = np.random.randint(100, size=len(idx))
- aff_w = np.asarray(w/sum(w))
- data_tsl = np.array(data_shadow)[idx,:]
- return np.dot(aff_w, data_tsl)
- def noise(self, data, batch_size):
- return self.tLoRAS(data=data, num_samples=batch_size, num_RACOS=batch_size)
- class GAN:
- """
- Class for GAN.
- """
- def __init__(self, n_feat=1, noise=None,
- discriminatorMin=None, discriminatorMax=None, generator=None):
- self.n_feat = n_feat
- if noise is None:
- self.noise = TLoRasNoise()
- else:
- self.noise = noise
- self.create_gan(
- discriminatorMin or self.create_discriminator_min(),
- discriminatorMax or self.create_discriminator_maj(),
- generator or self.create_generator())
- def create_generator(self):
- generator=Sequential()
- generator.add(Dense(units=25, input_dim=self.n_feat))
- generator.add(LeakyReLU(0.2))
- generator.add(Dense(units=256))
- generator.add(LeakyReLU(0.2))
- generator.add(Dense(units=512))
- generator.add(LeakyReLU(0.2))
- generator.add(Dense(units=256))
- generator.add(LeakyReLU(0.2))
- generator.add(Dense(units=25))
- generator.add(LeakyReLU(0.2))
- generator.add(Dense(units=self.n_feat))
- generator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
- return generator
- def create_discriminator_min(self):
- discriminator=Sequential()
- discriminator.add(Dense(units=1024,input_dim=self.n_feat))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dropout(0.3))
- discriminator.add(Dense(units=512))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dropout(0.3))
- discriminator.add(Dense(units=256))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dense(units=1, activation='sigmoid'))
- discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
- return discriminator
- def create_discriminator_maj(self):
- discriminator=Sequential()
- discriminator.add(Dense(units=1024,input_dim=self.n_feat))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dropout(0.3))
- discriminator.add(Dense(units=512))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dropout(0.3))
- discriminator.add(Dense(units=256))
- discriminator.add(LeakyReLU(0.2))
- discriminator.add(Dense(units=1, activation='sigmoid'))
- discriminator.compile(loss='binary_crossentropy', optimizer=adam_optimizer())
- return discriminator
- def create_gan(self, discriminator_min, discriminator_maj, generator):
- discriminator_min.trainable = False
- discriminator_maj.trainable = False
- gan_input = Input(shape=(self.n_feat,))
- x = generator(gan_input)
- gan_output_min= discriminator_min(x)
- gan_output_maj= discriminator_maj(x)
- gan = Model(inputs=gan_input, outputs=[gan_output_min,gan_output_maj])
- gan.compile(loss=['binary_crossentropy','binary_crossentropy'], optimizer='adam')
- # store the parts for later usage.
- self.generator = generator
- self.discriminator_min = discriminator_min
- self.discriminator_maj = discriminator_maj
- self.gan = gan
- return gan
- def train(self, parameters):
- for e in range(1,30+1 ):
- print(e)
- for _i in tqdm(range(parameters.batch_size)):
- # Get a random set of real images
- image_batch = parameters.im_batch_creator_min()
- #generate random noise as an input to initialize the generator
- noise_min = self.noise.noise(image_batch, parameters.batch_size)
- # Generate fake samples from noised input
- generated_images = self.generator.predict(noise_min)
- #Construct different batches of real and fake data
- X = np.concatenate((image_batch, generated_images))
- # Labels for generated and real data
- y_dis = np.zeros(2* parameters.batch_size)
- y_dis[: parameters.batch_size]=0.9
- #Pre train discriminator_min on fake and real data before starting the gan.
- self.discriminator_min.trainable = True
- _d_loss_min = self.discriminator_min.train_on_batch(X, y_dis)
- if e==0 or e>15:
- image_batch_maj = parameters.im_batch_creator_maj()
- X_maj = np.concatenate((image_batch_maj, generated_images))
- y_dis_maj=np.ones(2* parameters.batch_size)+1
- y_dis_maj[: parameters.batch_size]=0
- #Pre train discriminator_maj on fake and real data before starting the gan.
- self.discriminator_maj.trainable = True
- _d_loss_maj = self.discriminator_maj.train_on_batch(X_maj, y_dis_maj)
- #Tricking the noised input of the Generator as real data
- noise = self.noise.noise(image_batch, parameters.batch_size)
- y_gen_min = np.ones(parameters.batch_size)
- # During the training of gan,
- # the weights of discriminator should be fixed.
- #We can enforce that by setting the trainable flag
- self.discriminator_min.trainable = False
- self.discriminator_maj.trainable = False
- #training the GAN by alternating the training of the Discriminator
- #and training the chained GAN model with Discriminator’s weights freezed.
- _g_loss_min = self.gan.train_on_batch(noise, [y_gen_min, y_gen_min])
- def genFeat(self, parameters):
- im_batch = parameters.im_batch_creator_min()
- noise = self.noise.noise(im_batch, parameters.batch_size)
- return self.generator.predict(noise)
- def predict(self, data):
- y_pred = self.discriminator_maj.predict(data)
- return np.reshape(y_pred, len(data))
- class DataSet:
- """
- Stores data and Labels.
- """
- def __init__(self, data=None, labels=None, data0=None, data1=None):
- if data is None:
- self.fromData01(data0, data1)
- else:
- if labels is None:
- raise "expected labels to be a numpy.array"
- else:
- self.data = data
- self.labels = labels
- def fromData01(self, data0=None, data1=None):
- if data0 is None and data1 is None:
- raise "Expected data, data0 or data1 to be a numpy.array"
- elif data0 is None:
- self.data = data1
- self.labels = np.zeros(len(data1)) + 1
- elif data1 is None:
- self.data = data0
- self.labels = np.zeros(len(data0))
- else:
- self.data = np.concatenate((data1, data0))
- self.labels = np.concatenate(( np.zeros(len(data1)) + 1, np.zeros(len(data0)) ))
- class TrainTestData:
- """
- Stores features, data and labels for class 0 and class 1.
- """
- def __init__(self, features0, features1, trainFactor=0.9):
- self.nFeatures0 = len(features0)
- self.nFeatures1 = len(features1)
- self.features_0_trn, self.features_0_tst = self.splitUpData(features0, trainFactor)
- self.features_1_trn, self.features_1_tst = self.splitUpData(features1, trainFactor)
- self.test = DataSet(data1=self.features_1_tst, data0=self.features_0_tst)
- self.train = DataSet(data1=self.features_1_trn, data0=self.features_0_trn)
- def splitUpData(self, data, trainFactor=0.9):
- size = len(data)
- trainSize = math.ceil(size * trainFactor)
- trn = data[list(range(0, trainSize))]
- tst = data[list(range(trainSize, size))]
- return trn, tst
|