Ver Fonte

Added new classifier and tester / improved CSV-file generation / improved data plot

Kristian Schultz há 4 anos atrás
pai
commit
6da7e8b68b
4 ficheiros alterados com 205 adições e 90 exclusões
  1. 32 28
      Example Exercise.ipynb
  2. 13 11
      library/SimpleGan.py
  3. 45 13
      library/exercise.py
  4. 115 38
      library/testers.py

Diff do ficheiro suprimidas por serem muito extensas
+ 32 - 28
Example Exercise.ipynb


+ 13 - 11
library/SimpleGan.py

@@ -22,10 +22,12 @@ class SimpleGan(GanBaseClass):
     """
     A class for a simple GAN.
     """
-    def __init__(self, numOfFeatures=786, noiseSize=100):
+    def __init__(self, numOfFeatures=786, noiseSize=100, epochs=3, batchSize=128):
         self.isTrained = False
         self.noiseSize = noiseSize
         self.numOfFeatures = numOfFeatures
+        self.epochs = epochs
+        self.batchSize = batchSize
 
     def reset(self):
         """
@@ -82,41 +84,41 @@ class SimpleGan(GanBaseClass):
         discriminator.compile(loss='binary_crossentropy', optimizer=self._adamOptimizer())
         return discriminator
 
-    def train(self, dataset, epochs=1, batchSize=128):
+    def train(self, dataset):
         trainData = dataset.data1
         trainDataSize = trainData.shape[0]
 
         if trainDataSize <= 0:
             raise AttributeError("Train GAN: Expected data class 1 to contain at least one point.")
 
-        for e in range(epochs):
-            print(f"Epoch {e + 1}")
-            for _ in range(batchSize):
+        for e in range(self.epochs):
+            print(f"Epoch {e + 1}/{self.epochs}")
+            for _ in range(self.batchSize):
                 #generate  random noise as an input  to  initialize the  generator
-                noise= np.random.normal(0, 1, [batchSize, self.noiseSize])
+                noise= np.random.normal(0, 1, [self.batchSize, self.noiseSize])
 
                 # Generate fake MNIST images from noised input
                 generatedImages = self.generator.predict(noise)
 
                 # Get a random set of  real images
                 image_batch = dataset.data1[
-                    np.random.randint(low=0, high=trainDataSize, size=batchSize)
+                    np.random.randint(low=0, high=trainDataSize, size=self.batchSize)
                     ]
 
                 #Construct different batches of  real and fake data
                 X = np.concatenate([image_batch, generatedImages])
 
                 # Labels for generated and real data
-                y_dis=np.zeros(2 * batchSize)
-                y_dis[:batchSize] = 0.9
+                y_dis=np.zeros(2 * self.batchSize)
+                y_dis[:self.batchSize] = 0.9
 
                 #Pre train discriminator on  fake and real data  before starting the gan.
                 self.discriminator.trainable = True
                 self.discriminator.train_on_batch(X, y_dis)
 
                 #Tricking the noised input of the Generator as real data
-                noise = np.random.normal(0, 1, [batchSize, 100])
-                y_gen = np.ones(batchSize)
+                noise = np.random.normal(0, 1, [self.batchSize, 100])
+                y_gen = np.ones(self.batchSize)
 
                 # During the training of gan,
                 # the weights of discriminator should be fixed.

+ 45 - 13
library/exercise.py

@@ -13,7 +13,7 @@ from sklearn.preprocessing import StandardScaler
 import matplotlib.pyplot as plt
 
 from library.dataset import DataSet, TrainTestData
-from library.testers import lr, svm, knn
+from library.testers import lr, svm, knn, gb, TestResult
 
 
 class Exercise:
@@ -48,6 +48,7 @@ class Exercise:
             self.testFunctions = {
                 "LR": lr,
                 "SVM": svm,
+                "GB": gb,
                 "KNN": knn
                 }
 
@@ -117,6 +118,23 @@ class Exercise:
 
         self.debug("### Exercise is done.")
 
+        for (n, name) in enumerate(self.results):
+            stats = None
+            for (m, result) in enumerate(self.results[name]):
+                stats = result.addMinMaxAvg(stats)
+        
+            (mi, mx, avg) = TestResult.finishMinMaxAvg(stats)
+            self.debug("")
+            self.debug(f"-----[ {avg.title} ]-----")
+            self.debug("maximum:")
+            self.debug(str(mx))
+            self.debug("")
+            self.debug("average:")
+            self.debug(str(avg))
+            self.debug("")
+            self.debug("minimum:")
+            self.debug(str(mi))
+
     def _exerciseWithDataSlice(self, gan, dataSlice):
         """
         Runs one test for the given gan and dataSlice.
@@ -143,14 +161,15 @@ class Exercise:
         if numOfNeededSamples > 0:
             self.debug(f"-> create {numOfNeededSamples} synthetic samples")
             newSamples = gan.generateData(numOfNeededSamples)
+
+            # Print out an overview of the new dataset.
+            plotCloud(dataSlice.train.data0, dataSlice.train.data1, newSamples)
+
             dataSlice.train = DataSet(
                 data0=dataSlice.train.data0,
                 data1=np.concatenate((dataSlice.train.data1, newSamples))
                 )
 
-        # Print out an overview of the new dataset.
-        plotCloud(dataSlice.train)
-
         # Test this dataset with every given test-function.
         # The results are printed out and stored to the results dictionary.
         for testerName in self.testFunctions:
@@ -163,25 +182,34 @@ class Exercise:
     def saveResultsTo(self, fileName):
         with open(fileName, "w") as f:
             for (n, name) in enumerate(self.results):
-                if n == 0:
-                    f.write("---")
+                if n > 0:
+                    f.write("---\n")
     
                 f.write(name + "\n")
                 isFirst = True
-                for result in self.results[name]:
+                stats = None
+                for (m, result) in enumerate(self.results[name]):
                     if isFirst:
                         isFirst = False
-                        f.write(result.csvHeading() + "\n")
-                    f.write(result.toCSV() + "\n")
+                        f.write("Nr.;" + result.csvHeading() + "\n")
+
+                    stats = result.addMinMaxAvg(stats)
+
+                    f.write(f"{m + 1};" + result.toCSV() + "\n")
             
+                (mi, mx, avg) = TestResult.finishMinMaxAvg(stats)
+                f.write(f"max;" + mx.toCSV() + "\n")
+                f.write(f"avg;" + avg.toCSV() + "\n")
+                f.write(f"min;" + mi.toCSV() + "\n")
+
 
 
-def plotCloud(dataset):
+def plotCloud(data0, data1, dataNew):
     """
     Does a PCA analysis of the given data and plot the both important axis.
     """
     # Normalizes the data.
-    data_t = StandardScaler().fit_transform(dataset.data)
+    data_t = StandardScaler().fit_transform(np.concatenate([data0, data1, dataNew]))
 
     # Run the PCA analysis.
     pca = PCA(n_components=2)
@@ -189,7 +217,11 @@ def plotCloud(dataset):
 
     # Create a DataFrame for plotting.
     result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1'])
-    result['Cluster'] = dataset.labels
+    result['Cluster'] = np.concatenate([
+        np.zeros(len(data0)),
+        np.zeros(len(data1)) + 1,
+        np.zeros(len(dataNew)) + 2
+        ])
 
     # Plot the analysis results.
     sns.set( font_scale=1.2)
@@ -200,5 +232,5 @@ def plotCloud(dataset):
       legend=False,
       scatter_kws={"s": 3}, palette="Set1") # specify the point size
 
-    plt.legend(title='', loc='upper left', labels=['0', '1'])
+    plt.legend(title='', loc='upper left', labels=['0', '1', '2'])
     plt.show()

+ 115 - 38
library/testers.py

@@ -13,7 +13,17 @@ from sklearn.metrics import confusion_matrix
 from sklearn.metrics import average_precision_score
 from sklearn.metrics import f1_score
 from sklearn.metrics import balanced_accuracy_score
-
+from sklearn.metrics import cohen_kappa_score
+from sklearn.ensemble import GradientBoostingClassifier
+
+_tF1 = "f1 score"
+_tBalAcc = "balanced accuracy"
+_tTN = "TN"
+_tTP = "TP"
+_tFN = "FN"
+_tFP = "FP"
+_tAps = "average precision score"
+_tCks = "cohens kappa score"
 
 class TestResult:
     """
@@ -22,7 +32,7 @@ class TestResult:
     It stores its *title*, a confusion matrix (*con_mat*), the balanced accuracy score (*bal_acc*)
     and the f1 score (*f1*). If given the average precision score is also stored (*aps*).
     """
-    def __init__(self, title, labels, prediction, aps=None):
+    def __init__(self, title, labels=None, prediction=None, aps=None):
         """
         Creates an instance of this class. The stored data will be generated from the given values.
 
@@ -35,57 +45,109 @@ class TestResult:
         *aps* is a real number representing the average precision score.
         """
         self.title = title
-        self.con_mat = confusion_matrix(labels, prediction)
-        self.bal_acc = balanced_accuracy_score(labels, prediction)
-        self.f1 = f1_score(labels, prediction)
-        self.aps = aps
+        self.heading = [_tTN, _tTP, _tFN, _tFP, _tF1, _tBalAcc, _tCks]
+        if aps is not None:
+            self.heading.append(_tAps)
+        self.data = { n: 0.0 for n in self.heading }
+
+        if labels is not None and prediction is not None:
+            self.data[_tBalAcc] = balanced_accuracy_score(labels, prediction)
+            self.data[_tF1]     = f1_score(labels, prediction)
+            self.data[_tCks]    = cohen_kappa_score(labels, prediction)
+            conMat = self._enshureConfusionMatrix(confusion_matrix(labels, prediction))
+            [[tn, fp], [fn, tp]] = conMat
+            self.data[_tTN] = tn
+            self.data[_tTP] = tp
+            self.data[_tFN] = fn
+            self.data[_tFP] = fp
+
+        if aps is not None:
+            self.data[_tAps] = aps
 
     def __str__(self):
         """
         Generates a text representing this result.
         """
-        #tn, fp, fn, tp = con_mat.ravel()
-        r = self.con_mat.ravel()
-        text = f"tn, fp, fn, tp: {r}"
+        text = ""
 
-        if self.aps is not None:
-            text += f"\naverage_pr_score: {self.aps}"
+        tn = self.data[_tTN]
+        tp = self.data[_tTP]
+        fn = self.data[_tFN]
+        fp = self.data[_tFP]
+        text += f"{self.title} tn, fp: {tn}, {tp}\n"
+        text += f"{self.title} fn, tp: {fn}, {tp}"
 
-        text += f"\nf1 score_{self.title}: {self.f1}"
-        text += f"\nbalanced accuracy_{self.title}: {self.bal_acc}"
-        text += f"\nconfusion matrix_{self.title}\n {self.con_mat}"
-        return text
+        for k in self.heading:
+            if k not in [_tTP, _tTN, _tFP, _tFN]:
+                text += f"{self.title} {k}: {self.data[k]:.3f}\n"
 
+        return text
 
     def csvHeading(self):
-        r = [
-            "F1 score",
-            "balanced accuracy",
-            "TN",
-            "FP",
-            "FN",
-            "TP"
-            ]
+        return ";".join(self.heading)
 
-        if self.aps is not None:
-            r.append("Aps")
+    def toCSV(self):
+        return ";".join(map(lambda k: f"{self.data[k]:0.3f}", self.heading))
 
-        return ";".join(r)
+    @staticmethod
+    def _enshureConfusionMatrix(c):
+        c0 = [0.0, 0.0]
+        c1 = [0.0, 0.0]
 
-    def toCSV(self):
-        r = map(str, [
-            self.f1,
-            self.bal_acc,
-            self.con_mat[0] if len(self.con_mat) > 0 else float(self.con_mat),
-            self.con_mat[1] if len(self.con_mat) > 1 else 0,
-            self.con_mat[2] if len(self.con_mat) > 2 else 0,
-            self.con_mat[3] if len(self.con_mat) > 3 else 0
-            ])
+        if len(c) > 0:
+            if len(c[0]) > 0:
+                c0[0] = c[0][0]
+
+            if len(c[0]) > 1:
+                c0[1] = c[0][1]
+
+        if len(c) > 1 and len(c[1]) > 1:
+            c1[0] = c[1][0]
+            c1[1] = c[1][1]
+
+        return [c0, c1]
+
+    def copy(self):
+        r = TestResult(self.title)
+        r.data = self.data.copy()
+        r.heading = self.heading.copy()
+        return r
+
+
+    def addMinMaxAvg(self, mma=None):
+        if mma is None:
+            return (1, self.copy(), self.copy(), self.copy())
+
+        (n, mi, mx, a) = mma
+
+        for k in a.heading:
+            if k in self.heading:
+                a.data[k] += self.data[k]
 
-        if self.aps is not None:
-            r.append(str(self.aps))
+        for k in mi.heading:
+            if k in self.heading:
+                mi.data[k] = min(mi.data[k], self.data[k])
 
-        return ";".join(r)
+        for k in mx.heading:
+            if k in self.heading:
+                mx.data[k] = max(mx.data[k], self.data[k])
+
+        return (n + 1, mi, mx, a)
+
+    @staticmethod
+    def finishMinMaxAvg(mma):
+        if mma is None:
+            return (TestResult("?"), TestResult("?"), TestResult("?"))
+        else:
+            (n, mi, ma, a) = mma
+            for k in a.heading:
+                if n > 0:
+                    a.data[k] = a.data[k] / n
+                else:
+                    a.data[k] = 0.0
+            return (mi, ma, a)
+
+        
 
 
 def lr(ttd):
@@ -146,6 +208,21 @@ def knn(ttd):
     return TestResult("KNN", ttd.test.labels, prediction)
 
 
+def gb(ttd):
+    """
+    Runs a test for a dataset with the gradient boosting algorithm.
+    It returns a /TestResult./
+
+    *ttd* is a /library.dataset.TrainTestData/ instance containing data to test.
+    """
+    checkType(ttd)
+    tester = GradientBoostingClassifier()
+    tester.fit(ttd.train.data, ttd.train.labels)
+
+    prediction = tester.predict(ttd.test.data)
+    return TestResult("GB", ttd.test.labels, prediction)
+
+
 def checkType(t):
     if str(type(t)) == "<class 'numpy.ndarray'>":
         return t.shape[0] > 0 and all(map(checkType, t))

Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff