|
@@ -1,23 +1,19 @@
|
|
|
-import numpy as np
|
|
|
|
|
-import pandas as pd
|
|
|
|
|
|
|
+"""
|
|
|
|
|
+Class for testing the performance of Generative Adversarial Networks
|
|
|
|
|
+in generating synthetic samples for datasets with a minority class.
|
|
|
|
|
+"""
|
|
|
|
|
|
|
|
-import sklearn
|
|
|
|
|
-# needed in function lr
|
|
|
|
|
-from sklearn import metrics
|
|
|
|
|
-from sklearn.neighbors import KNeighborsClassifier
|
|
|
|
|
-from sklearn.linear_model import LogisticRegression
|
|
|
|
|
-from sklearn.metrics import confusion_matrix
|
|
|
|
|
-from sklearn.metrics import average_precision_score
|
|
|
|
|
-from sklearn.metrics import f1_score
|
|
|
|
|
-from sklearn.metrics import balanced_accuracy_score
|
|
|
|
|
|
|
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import pandas as pd
|
|
|
|
|
|
|
|
-from sklearn.decomposition import PCA
|
|
|
|
|
import seaborn as sns
|
|
import seaborn as sns
|
|
|
|
|
+from sklearn.decomposition import PCA
|
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
|
|
from library.dataset import DataSet, TrainTestData
|
|
from library.dataset import DataSet, TrainTestData
|
|
|
|
|
+from library.testers import lr, svm, knn
|
|
|
|
|
|
|
|
|
|
|
|
|
class Exercise:
|
|
class Exercise:
|
|
@@ -25,134 +21,167 @@ class Exercise:
|
|
|
Exercising a test for a minority class extension class.
|
|
Exercising a test for a minority class extension class.
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
- def __init__(self, testFunctions, shuffleFunction=None, numOfSlices=5, numOfShuffles=5):
|
|
|
|
|
- self.numOfSlices = numOfSlices
|
|
|
|
|
- self.numOfShuffles = numOfShuffles
|
|
|
|
|
- self.testFunctions = testFunctions
|
|
|
|
|
|
|
+ def __init__(self, testFunctions=None, shuffleFunction=None, numOfSlices=5, numOfShuffles=5):
|
|
|
|
|
+ """
|
|
|
|
|
+ Creates a instance of this class.
|
|
|
|
|
+
|
|
|
|
|
+ *testFunctions* is a dictionary /(String : Function)/ of functions for testing
|
|
|
|
|
+ a generated dataset. The functions have the signature:
|
|
|
|
|
+ /(TrainTestData, TrainTestData) -> TestResult/
|
|
|
|
|
+
|
|
|
|
|
+ *shuffleFunction* is either None or a function /numpy.array -> numpy.array/
|
|
|
|
|
+ that shuffles a given array.
|
|
|
|
|
+
|
|
|
|
|
+ *numOfSlices* is an integer > 0. The dataset given for the run function
|
|
|
|
|
+ will be divided in such many slices.
|
|
|
|
|
+
|
|
|
|
|
+ *numOfShuffles* is an integer > 0. It gives the number of exercised tests.
|
|
|
|
|
+ The GAN will be trained and tested (numOfShuffles * numOfSlices) times.
|
|
|
|
|
+ """
|
|
|
|
|
+ self.numOfSlices = int(numOfSlices)
|
|
|
|
|
+ self.numOfShuffles = int(numOfShuffles)
|
|
|
self.shuffleFunction = shuffleFunction
|
|
self.shuffleFunction = shuffleFunction
|
|
|
self.debug = print
|
|
self.debug = print
|
|
|
|
|
|
|
|
|
|
+ self.testFunctions = testFunctions
|
|
|
|
|
+ if self.testFunctions is None:
|
|
|
|
|
+ self.testFunctions = {
|
|
|
|
|
+ "LR": lr,
|
|
|
|
|
+ "SVM": svm,
|
|
|
|
|
+ "KNN": knn
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ self.results = { name: [] for name in self.testFunctions }
|
|
|
|
|
+
|
|
|
|
|
+ # Check if the given values are in valid range.
|
|
|
|
|
+ if self.numOfSlices < 0:
|
|
|
|
|
+ raise AttributeError(f"Expected numOfSlices to be > 0 but got {self.numOfSlices}")
|
|
|
|
|
+
|
|
|
|
|
+ if self.numOfShuffles < 0:
|
|
|
|
|
+ raise AttributeError(f"Expected numOfShuffles to be > 0 but got {self.numOfShuffles}")
|
|
|
|
|
+
|
|
|
def run(self, gan, dataset):
|
|
def run(self, gan, dataset):
|
|
|
|
|
+ """
|
|
|
|
|
+ Exercise all tests for a given GAN.
|
|
|
|
|
+
|
|
|
|
|
+ *gan* is a implemention of library.interfaces.GanBaseClass.
|
|
|
|
|
+ It defines the GAN to test.
|
|
|
|
|
+
|
|
|
|
|
+ *dataset* is a library.dataset.DataSet that contains the majority class
|
|
|
|
|
+ (dataset.data0) and the minority class (dataset.data1) of data
|
|
|
|
|
+ for training and testing.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # Check if the given values are in valid range.
|
|
|
if len(dataset.data1) > len(dataset.data0):
|
|
if len(dataset.data1) > len(dataset.data0):
|
|
|
- raise AttributeError("Expected class 1 to be the minority class but class 1 is bigger than class 0.")
|
|
|
|
|
|
|
+ raise AttributeError(
|
|
|
|
|
+ "Expected class 1 to be the minority class but class 1 is bigger than class 0.")
|
|
|
|
|
+
|
|
|
|
|
+ # Reset results array.
|
|
|
|
|
+ self.results = { name: [] for name in self.testFunctions }
|
|
|
|
|
|
|
|
|
|
+ # Repeat numOfShuffles times
|
|
|
self.debug("### Start exercise for synthetic point generator")
|
|
self.debug("### Start exercise for synthetic point generator")
|
|
|
for shuffleStep in range(self.numOfShuffles):
|
|
for shuffleStep in range(self.numOfShuffles):
|
|
|
- stepTitle = "Step {shuffleStep + 1}/{self.numOfShuffles}"
|
|
|
|
|
|
|
+ stepTitle = f"Step {shuffleStep + 1}/{self.numOfShuffles}"
|
|
|
self.debug(f"\n====== {stepTitle} =======")
|
|
self.debug(f"\n====== {stepTitle} =======")
|
|
|
|
|
|
|
|
|
|
+ # If a shuffle fuction is given then shuffle the data before the next
|
|
|
|
|
+ # exercise starts.
|
|
|
if self.shuffleFunction is not None:
|
|
if self.shuffleFunction is not None:
|
|
|
self.debug("-> Shuffling data")
|
|
self.debug("-> Shuffling data")
|
|
|
dataset.shuffleWith(self.shuffleFunction)
|
|
dataset.shuffleWith(self.shuffleFunction)
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ # Split the (shuffled) data into numOfSlices slices.
|
|
|
|
|
+ # dataSlices is a list of TrainTestData instances.
|
|
|
|
|
+ #
|
|
|
|
|
+ # If numOfSlices=3 then the data will be splited in D1, D2, D3.
|
|
|
|
|
+ # dataSlices will contain:
|
|
|
|
|
+ # [(train=D2+D3, test=D1), (train=D1+D3, test=D2), (train=D1+D2, test=D3)]
|
|
|
self.debug("-> Spliting data to slices")
|
|
self.debug("-> Spliting data to slices")
|
|
|
dataSlices = TrainTestData.splitDataToSlices(dataset, self.numOfSlices)
|
|
dataSlices = TrainTestData.splitDataToSlices(dataset, self.numOfSlices)
|
|
|
|
|
|
|
|
|
|
+ # Do a exercise for every slice.
|
|
|
for (sliceNr, sliceData) in enumerate(dataSlices):
|
|
for (sliceNr, sliceData) in enumerate(dataSlices):
|
|
|
- sliceTitle = "Slice {sliceNr + 1}/{self.numOfSlices}"
|
|
|
|
|
|
|
+ sliceTitle = f"Slice {sliceNr + 1}/{self.numOfSlices}"
|
|
|
self.debug(f"\n------ {stepTitle}: {sliceTitle} -------")
|
|
self.debug(f"\n------ {stepTitle}: {sliceTitle} -------")
|
|
|
self._exerciseWithDataSlice(gan, sliceData)
|
|
self._exerciseWithDataSlice(gan, sliceData)
|
|
|
|
|
+
|
|
|
self.debug("### Exercise is done.")
|
|
self.debug("### Exercise is done.")
|
|
|
|
|
|
|
|
def _exerciseWithDataSlice(self, gan, dataSlice):
|
|
def _exerciseWithDataSlice(self, gan, dataSlice):
|
|
|
|
|
+ """
|
|
|
|
|
+ Runs one test for the given gan and dataSlice.
|
|
|
|
|
+
|
|
|
|
|
+ *gan* is a implemention of library.interfaces.GanBaseClass.
|
|
|
|
|
+ It defines the GAN to test.
|
|
|
|
|
+
|
|
|
|
|
+ *dataSlice* is a library.dataset.TrainTestData instance that contains
|
|
|
|
|
+ one data slice with training and testing data.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # Train the gan so it can produce synthetic samples.
|
|
|
self.debug("-> Train generator for synthetic samples")
|
|
self.debug("-> Train generator for synthetic samples")
|
|
|
gan.train(dataSlice.train)
|
|
gan.train(dataSlice.train)
|
|
|
|
|
|
|
|
|
|
+ # Count how many syhthetic samples are needed.
|
|
|
numOfNeededSamples = dataSlice.train.size0 - dataSlice.train.size1
|
|
numOfNeededSamples = dataSlice.train.size0 - dataSlice.train.size1
|
|
|
|
|
|
|
|
|
|
+ # Add synthetic samples (generated by the GAN) to the minority class.
|
|
|
if numOfNeededSamples > 0:
|
|
if numOfNeededSamples > 0:
|
|
|
self.debug(f"-> create {numOfNeededSamples} synthetic samples")
|
|
self.debug(f"-> create {numOfNeededSamples} synthetic samples")
|
|
|
- newSamples = np.asarray([gan.generateData() for _ in range(numOfNeededSamples)])
|
|
|
|
|
- train = DataSet(
|
|
|
|
|
|
|
+ newSamples = gan.generateData(numOfNeededSamples)
|
|
|
|
|
+ dataSlice.train = DataSet(
|
|
|
data0=dataSlice.train.data0,
|
|
data0=dataSlice.train.data0,
|
|
|
data1=np.concatenate((dataSlice.train.data1, newSamples))
|
|
data1=np.concatenate((dataSlice.train.data1, newSamples))
|
|
|
)
|
|
)
|
|
|
- else:
|
|
|
|
|
- train = dataSlice.train
|
|
|
|
|
|
|
|
|
|
- plotCloud(train.data, train.labels)
|
|
|
|
|
|
|
+ # Print out an overview of the new dataset.
|
|
|
|
|
+ plotCloud(dataSlice.train)
|
|
|
|
|
|
|
|
- results = { name: [] for name in self.testFunctions }
|
|
|
|
|
|
|
+ # Test this dataset with every given test-function.
|
|
|
|
|
+ # The results are printed out and stored to the results dictionary.
|
|
|
for testerName in self.testFunctions:
|
|
for testerName in self.testFunctions:
|
|
|
self.debug(f"-> test with '{testerName}'")
|
|
self.debug(f"-> test with '{testerName}'")
|
|
|
- testResult = (self.testFunctions[testerName])(train, dataSlice.test)
|
|
|
|
|
- testResult.print()
|
|
|
|
|
- results[testerName].append(testResult)
|
|
|
|
|
-
|
|
|
|
|
- self.debug("-> check results")
|
|
|
|
|
- self._checkResults(results, dataSlice.test.labels)
|
|
|
|
|
-
|
|
|
|
|
- def _checkResults(self, results, expectedLabels):
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class TestResult:
|
|
|
|
|
- def __init__(self, title, labels, prediction, aps=None):
|
|
|
|
|
- self.title = title
|
|
|
|
|
- self.con_mat = confusion_matrix(labels, prediction)
|
|
|
|
|
- self.bal_acc = balanced_accuracy_score(labels, prediction)
|
|
|
|
|
- self.f1 = f1_score(labels, prediction)
|
|
|
|
|
- self.aps = aps
|
|
|
|
|
-
|
|
|
|
|
- def print(self):
|
|
|
|
|
- #tn, fp, fn, tp = con_mat.ravel()
|
|
|
|
|
- r = self.con_mat.ravel()
|
|
|
|
|
- print('tn, fp, fn, tp:', r)
|
|
|
|
|
-
|
|
|
|
|
- if self.aps is not None:
|
|
|
|
|
- print('average_pr_score:', self.aps)
|
|
|
|
|
-
|
|
|
|
|
- print(f'f1 score_{self.title}:', self.f1)
|
|
|
|
|
- print(f'balanced accuracy_{self.title}:', self.bal_acc)
|
|
|
|
|
- print(f'confusion matrix_{self.title}')
|
|
|
|
|
- print(self.con_mat)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ testResult = (self.testFunctions[testerName])(dataSlice)
|
|
|
|
|
+ self.debug(str(testResult))
|
|
|
|
|
+ self.results[testerName].append(testResult)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def lr(train, test):
|
|
|
|
|
- logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial', class_weight={0: 1, 1: 1.3})
|
|
|
|
|
- logreg.fit(train.data, train.labels)
|
|
|
|
|
|
|
+ def saveResultsTo(self, fileName):
|
|
|
|
|
+ with open(fileName, "w") as f:
|
|
|
|
|
+ for name in self.results:
|
|
|
|
|
+ f.write(name + "\n")
|
|
|
|
|
+ isFirst = True
|
|
|
|
|
+ for result in self.results[name]:
|
|
|
|
|
+ if isFirst:
|
|
|
|
|
+ isFirst = False
|
|
|
|
|
+ f.write(result.csvHeading() + "\n")
|
|
|
|
|
+ f.write(result.toCSV() + "\n")
|
|
|
|
|
+
|
|
|
|
|
|
|
|
- prediction = logreg.predict(test.data)
|
|
|
|
|
-
|
|
|
|
|
- prob_lr = logreg.predict_proba(test.data)
|
|
|
|
|
- aps_lr = average_precision_score(test.labels, prob_lr[:,1])
|
|
|
|
|
- return TestResult("LR", test.labels, prediction, aps_lr)
|
|
|
|
|
-
|
|
|
|
|
-def svm(train, test):
|
|
|
|
|
- svm = sklearn.svm.SVC(kernel='linear', decision_function_shape='ovo', class_weight={0: 1., 1: 1.}, probability=True)
|
|
|
|
|
- svm.fit(train.data, train.labels)
|
|
|
|
|
-
|
|
|
|
|
- prediction = svm.predict(test.data)
|
|
|
|
|
- return TestResult("SVM", test.labels, prediction)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def knn(train, test):
|
|
|
|
|
- knn = KNeighborsClassifier(n_neighbors=10)
|
|
|
|
|
- knn.fit(train.data, train.labels)
|
|
|
|
|
-
|
|
|
|
|
- prediction = knn.predict(test.data)
|
|
|
|
|
- return TestResult("KNN", test.labels, prediction)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-allTesters = {
|
|
|
|
|
- "LR": lr,
|
|
|
|
|
- "SVM": svm,
|
|
|
|
|
- "KNN": knn
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
|
|
+def plotCloud(dataset):
|
|
|
|
|
+ """
|
|
|
|
|
+ Does a PCA analysis of the given data and plot the both important axis.
|
|
|
|
|
+ """
|
|
|
|
|
+ # Normalizes the data.
|
|
|
|
|
+ data_t = StandardScaler().fit_transform(dataset.data)
|
|
|
|
|
|
|
|
-def plotCloud(data, labels):
|
|
|
|
|
- data_t = StandardScaler().fit_transform(data)
|
|
|
|
|
|
|
+ # Run the PCA analysis.
|
|
|
pca = PCA(n_components=2)
|
|
pca = PCA(n_components=2)
|
|
|
pc = pca.fit_transform(data_t)
|
|
pc = pca.fit_transform(data_t)
|
|
|
|
|
+
|
|
|
|
|
+ # Create a DataFrame for plotting.
|
|
|
result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1'])
|
|
result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1'])
|
|
|
- result['Cluster'] = labels
|
|
|
|
|
-
|
|
|
|
|
|
|
+ result['Cluster'] = dataset.labels
|
|
|
|
|
+
|
|
|
|
|
+ # Plot the analysis results.
|
|
|
sns.set( font_scale=1.2)
|
|
sns.set( font_scale=1.2)
|
|
|
- g=sns.lmplot( x="PCA0", y="PCA1",
|
|
|
|
|
- data=result,
|
|
|
|
|
- fit_reg=False,
|
|
|
|
|
|
|
+ sns.lmplot( x="PCA0", y="PCA1",
|
|
|
|
|
+ data=result,
|
|
|
|
|
+ fit_reg=False,
|
|
|
hue='Cluster', # color by cluster
|
|
hue='Cluster', # color by cluster
|
|
|
legend=False,
|
|
legend=False,
|
|
|
scatter_kws={"s": 3}, palette="Set1") # specify the point size
|
|
scatter_kws={"s": 3}, palette="Set1") # specify the point size
|