| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- import numpy as np
- import pandas as pd
- import sklearn
- # needed in function lr
- from sklearn import metrics
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.linear_model import LogisticRegression
- from sklearn.metrics import confusion_matrix
- from sklearn.metrics import average_precision_score
- from sklearn.metrics import f1_score
- from sklearn.metrics import balanced_accuracy_score
- from sklearn.decomposition import PCA
- import seaborn as sns
- from sklearn.preprocessing import StandardScaler
- import matplotlib.pyplot as plt
- from library.dataset import DataSet, TrainTestData
- class Exercise:
- """
- Exercising a test for a minority class extension class.
- """
- def __init__(self, testFunctions, shuffleFunction=None, numOfSlices=5, numOfShuffles=5):
- self.numOfSlices = numOfSlices
- self.numOfShuffles = numOfShuffles
- self.testFunctions = testFunctions
- self.shuffleFunction = shuffleFunction
- self.debug = print
- def run(self, gan, dataset):
- if len(dataset.data0) > len(dataset.data1):
- raise AttributeError("Expected class 0 to be the minority class but class 0 is bigger than class 1.")
- self.debug("### Start exercise for synthetic point generator")
- for shuffleStep in range(self.numOfShuffles):
- stepTitle = "Step {shuffleStep + 1}/{self.numOfShuffles}"
- self.debug(f"\n====== {stepTitle} =======")
- if self.shuffleFunction is not None:
- self.debug("-> Shuffling data")
- dataset.shuffleWith(self.shuffleFunction)
- self.debug("-> Spliting data to slices")
- dataSlices = TrainTestData.splitDataToSlices(dataset, self.numOfSlices)
- for (sliceNr, sliceData) in enumerate(dataSlices):
- sliceTitle = "Slice {sliceNr + 1}/{self.numOfSlices}"
- self.debug(f"\n------ {stepTitle}: {sliceTitle} -------")
- self._exerciseWithDataSlice(gan, sliceData)
- self.debug("### Exercise is done.")
- def _exerciseWithDataSlice(self, gan, dataSlice):
- self.debug("-> Train generator for synthetic samples")
- gan.train(dataSlice.train)
- numOfNeededSamples = dataSlice.train.size1 - dataSlice.train.size0
- if numOfNeededSamples > 0:
- self.debug(f"-> create {numOfNeededSamples} synthetic samples")
- newSamples = np.asarray([gan.generateData() for _ in range(numOfNeededSamples)])
- train = DataSet(
- data0=np.concatenate((dataSlice.train.data0, newSamples)),
- data1=dataSlice.train.data1
- )
- else:
- train = dataSlice.train
- plotCloud(train.data, train.labels)
- results = { name: [] for name in self.testFunctions }
- for testerName in self.testFunctions:
- self.debug(f"-> test with '{testerName}'")
- testResult = (self.testFunctions[testerName])(train, dataSlice.test)
- testResult.print()
- results[testerName].append(testResult)
- self.debug("-> check results")
- self._checkResults(results, dataSlice.test.labels)
- def _checkResults(self, results, expectedLabels):
- pass
- class TestResult:
- def __init__(self, title, labels, prediction, aps=None):
- self.title = title
- self.con_mat = confusion_matrix(labels, prediction)
- self.bal_acc = balanced_accuracy_score(labels, prediction)
- self.f1 = f1_score(labels, prediction)
- self.aps = aps
- def print(self):
- #tn, fp, fn, tp = con_mat.ravel()
- r = self.con_mat.ravel()
- print('tn, fp, fn, tp:', r)
- if self.aps is not None:
- print('average_pr_score:', self.aps)
- print(f'f1 score_{self.title}:', self.f1)
- print(f'balanced accuracy_{self.title}:', self.bal_acc)
- print(f'confusion matrix_{self.title}')
- print(self.con_mat)
- def lr(train, test):
- logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial', class_weight={0: 1, 1: 1.3})
- logreg.fit(train.data, train.labels)
- prediction = logreg.predict(test.data)
- prob_lr = logreg.predict_proba(test.data)
- aps_lr = average_precision_score(test.labels, prob_lr[:,1])
- return TestResult("LR", test.labels, prediction, aps_lr)
- def svm(train, test):
- svm = sklearn.svm.SVC(kernel='linear', decision_function_shape='ovo', class_weight={0: 1., 1: 1.}, probability=True)
- svm.fit(train.data, train.labels)
- prediction = svm.predict(test.data)
- return TestResult("SVM", test.labels, prediction)
- def knn(train, test):
- knn = KNeighborsClassifier(n_neighbors=10)
- knn.fit(train.data, train.labels)
-
- prediction = knn.predict(test.data)
- return TestResult("KNN", test.labels, prediction)
- allTesters = {
- "LR": lr,
- "SVM": svm,
- "KNN": knn
- }
- def plotCloud(data, labels):
- data_t = StandardScaler().fit_transform(data)
- pca = PCA(n_components=2)
- pc = pca.fit_transform(data_t)
- result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1'])
- result['Cluster'] = labels
-
- sns.set( font_scale=1.2)
- g=sns.lmplot( x="PCA0", y="PCA1",
- data=result,
- fit_reg=False,
- hue='Cluster', # color by cluster
- legend=False,
- scatter_kws={"s": 3}, palette="Set1") # specify the point size
- plt.legend(title='', loc='upper left', labels=['0', '1'])
- plt.show()
|