import numpy as np import pandas as pd import sklearn # needed in function lr from sklearn import metrics from sklearn.neighbors import KNeighborsClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import confusion_matrix from sklearn.metrics import average_precision_score from sklearn.metrics import f1_score from sklearn.metrics import balanced_accuracy_score from sklearn.decomposition import PCA import seaborn as sns from sklearn.preprocessing import StandardScaler import matplotlib.pyplot as plt from library.dataset import DataSet, TrainTestData class Exercise: """ Exercising a test for a minority class extension class. """ def __init__(self, testFunctions, shuffleFunction=None, numOfSlices=5, numOfShuffles=5): self.numOfSlices = numOfSlices self.numOfShuffles = numOfShuffles self.testFunctions = testFunctions self.shuffleFunction = shuffleFunction self.debug = print def run(self, gan, dataset): if len(dataset.data0) > len(dataset.data1): raise AttributeError("Expected class 0 to be the minority class but class 0 is bigger than class 1.") self.debug("### Start exercise for synthetic point generator") for shuffleStep in range(self.numOfShuffles): stepTitle = "Step {shuffleStep + 1}/{self.numOfShuffles}" self.debug(f"\n====== {stepTitle} =======") if self.shuffleFunction is not None: self.debug("-> Shuffling data") dataset.shuffleWith(self.shuffleFunction) self.debug("-> Spliting data to slices") dataSlices = TrainTestData.splitDataToSlices(dataset, self.numOfSlices) for (sliceNr, sliceData) in enumerate(dataSlices): sliceTitle = "Slice {sliceNr + 1}/{self.numOfSlices}" self.debug(f"\n------ {stepTitle}: {sliceTitle} -------") self._exerciseWithDataSlice(gan, sliceData) self.debug("### Exercise is done.") def _exerciseWithDataSlice(self, gan, dataSlice): self.debug("-> Train generator for synthetic samples") gan.train(dataSlice.train) numOfNeededSamples = dataSlice.train.size1 - dataSlice.train.size0 if numOfNeededSamples > 0: self.debug(f"-> create {numOfNeededSamples} synthetic samples") newSamples = np.asarray([gan.generateData() for _ in range(numOfNeededSamples)]) train = DataSet( data0=np.concatenate((dataSlice.train.data0, newSamples)), data1=dataSlice.train.data1 ) else: train = dataSlice.train plotCloud(train.data, train.labels) results = { name: [] for name in self.testFunctions } for testerName in self.testFunctions: self.debug(f"-> test with '{testerName}'") testResult = (self.testFunctions[testerName])(train, dataSlice.test) testResult.print() results[testerName].append(testResult) self.debug("-> check results") self._checkResults(results, dataSlice.test.labels) def _checkResults(self, results, expectedLabels): pass class TestResult: def __init__(self, title, labels, prediction, aps=None): self.title = title self.con_mat = confusion_matrix(labels, prediction) self.bal_acc = balanced_accuracy_score(labels, prediction) self.f1 = f1_score(labels, prediction) self.aps = aps def print(self): #tn, fp, fn, tp = con_mat.ravel() r = self.con_mat.ravel() print('tn, fp, fn, tp:', r) if self.aps is not None: print('average_pr_score:', self.aps) print(f'f1 score_{self.title}:', self.f1) print(f'balanced accuracy_{self.title}:', self.bal_acc) print(f'confusion matrix_{self.title}') print(self.con_mat) def lr(train, test): logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial', class_weight={0: 1, 1: 1.3}) logreg.fit(train.data, train.labels) prediction = logreg.predict(test.data) prob_lr = logreg.predict_proba(test.data) aps_lr = average_precision_score(test.labels, prob_lr[:,1]) return TestResult("LR", test.labels, prediction, aps_lr) def svm(train, test): svm = sklearn.svm.SVC(kernel='linear', decision_function_shape='ovo', class_weight={0: 1., 1: 1.}, probability=True) svm.fit(train.data, train.labels) prediction = svm.predict(test.data) return TestResult("SVM", test.labels, prediction) def knn(train, test): knn = KNeighborsClassifier(n_neighbors=10) knn.fit(train.data, train.labels) prediction = knn.predict(test.data) return TestResult("KNN", test.labels, prediction) allTesters = { "LR": lr, "SVM": svm, "KNN": knn } def plotCloud(data, labels): data_t = StandardScaler().fit_transform(data) pca = PCA(n_components=2) pc = pca.fit_transform(data_t) result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1']) result['Cluster'] = labels sns.set( font_scale=1.2) g=sns.lmplot( x="PCA0", y="PCA1", data=result, fit_reg=False, hue='Cluster', # color by cluster legend=False, scatter_kws={"s": 3}, palette="Set1") # specify the point size plt.legend(title='', loc='upper left', labels=['0', '1']) plt.show()