exercise.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import numpy as np
  2. import pandas as pd
  3. import sklearn
  4. # needed in function lr
  5. from sklearn import metrics
  6. from sklearn.neighbors import KNeighborsClassifier
  7. from sklearn.linear_model import LogisticRegression
  8. from sklearn.metrics import confusion_matrix
  9. from sklearn.metrics import average_precision_score
  10. from sklearn.metrics import f1_score
  11. from sklearn.metrics import balanced_accuracy_score
  12. from sklearn.decomposition import PCA
  13. import seaborn as sns
  14. from sklearn.preprocessing import StandardScaler
  15. import matplotlib.pyplot as plt
  16. from library.dataset import DataSet, TrainTestData
  17. class Exercise:
  18. """
  19. Exercising a test for a minority class extension class.
  20. """
  21. def __init__(self, testFunctions, shuffleFunction=None, numOfSlices=5, numOfShuffles=5):
  22. self.numOfSlices = numOfSlices
  23. self.numOfShuffles = numOfShuffles
  24. self.testFunctions = testFunctions
  25. self.shuffleFunction = shuffleFunction
  26. self.debug = print
  27. def run(self, gan, dataset):
  28. if len(dataset.data1) > len(dataset.data0):
  29. raise AttributeError("Expected class 1 to be the minority class but class 1 is bigger than class 0.")
  30. self.debug("### Start exercise for synthetic point generator")
  31. for shuffleStep in range(self.numOfShuffles):
  32. stepTitle = "Step {shuffleStep + 1}/{self.numOfShuffles}"
  33. self.debug(f"\n====== {stepTitle} =======")
  34. if self.shuffleFunction is not None:
  35. self.debug("-> Shuffling data")
  36. dataset.shuffleWith(self.shuffleFunction)
  37. self.debug("-> Spliting data to slices")
  38. dataSlices = TrainTestData.splitDataToSlices(dataset, self.numOfSlices)
  39. for (sliceNr, sliceData) in enumerate(dataSlices):
  40. sliceTitle = "Slice {sliceNr + 1}/{self.numOfSlices}"
  41. self.debug(f"\n------ {stepTitle}: {sliceTitle} -------")
  42. self._exerciseWithDataSlice(gan, sliceData)
  43. self.debug("### Exercise is done.")
  44. def _exerciseWithDataSlice(self, gan, dataSlice):
  45. self.debug("-> Train generator for synthetic samples")
  46. gan.train(dataSlice.train)
  47. numOfNeededSamples = dataSlice.train.size0 - dataSlice.train.size1
  48. if numOfNeededSamples > 0:
  49. self.debug(f"-> create {numOfNeededSamples} synthetic samples")
  50. newSamples = np.asarray([gan.generateData() for _ in range(numOfNeededSamples)])
  51. train = DataSet(
  52. data0=dataSlice.train.data0,
  53. data1=np.concatenate((dataSlice.train.data1, newSamples))
  54. )
  55. else:
  56. train = dataSlice.train
  57. plotCloud(train.data, train.labels)
  58. results = { name: [] for name in self.testFunctions }
  59. for testerName in self.testFunctions:
  60. self.debug(f"-> test with '{testerName}'")
  61. testResult = (self.testFunctions[testerName])(train, dataSlice.test)
  62. testResult.print()
  63. results[testerName].append(testResult)
  64. self.debug("-> check results")
  65. self._checkResults(results, dataSlice.test.labels)
  66. def _checkResults(self, results, expectedLabels):
  67. pass
  68. class TestResult:
  69. def __init__(self, title, labels, prediction, aps=None):
  70. self.title = title
  71. self.con_mat = confusion_matrix(labels, prediction)
  72. self.bal_acc = balanced_accuracy_score(labels, prediction)
  73. self.f1 = f1_score(labels, prediction)
  74. self.aps = aps
  75. def print(self):
  76. #tn, fp, fn, tp = con_mat.ravel()
  77. r = self.con_mat.ravel()
  78. print('tn, fp, fn, tp:', r)
  79. if self.aps is not None:
  80. print('average_pr_score:', self.aps)
  81. print(f'f1 score_{self.title}:', self.f1)
  82. print(f'balanced accuracy_{self.title}:', self.bal_acc)
  83. print(f'confusion matrix_{self.title}')
  84. print(self.con_mat)
  85. def lr(train, test):
  86. logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial', class_weight={0: 1, 1: 1.3})
  87. logreg.fit(train.data, train.labels)
  88. prediction = logreg.predict(test.data)
  89. prob_lr = logreg.predict_proba(test.data)
  90. aps_lr = average_precision_score(test.labels, prob_lr[:,1])
  91. return TestResult("LR", test.labels, prediction, aps_lr)
  92. def svm(train, test):
  93. svm = sklearn.svm.SVC(kernel='linear', decision_function_shape='ovo', class_weight={0: 1., 1: 1.}, probability=True)
  94. svm.fit(train.data, train.labels)
  95. prediction = svm.predict(test.data)
  96. return TestResult("SVM", test.labels, prediction)
  97. def knn(train, test):
  98. knn = KNeighborsClassifier(n_neighbors=10)
  99. knn.fit(train.data, train.labels)
  100. prediction = knn.predict(test.data)
  101. return TestResult("KNN", test.labels, prediction)
  102. allTesters = {
  103. "LR": lr,
  104. "SVM": svm,
  105. "KNN": knn
  106. }
  107. def plotCloud(data, labels):
  108. data_t = StandardScaler().fit_transform(data)
  109. pca = PCA(n_components=2)
  110. pc = pca.fit_transform(data_t)
  111. result = pd.DataFrame(data=pc, columns=['PCA0', 'PCA1'])
  112. result['Cluster'] = labels
  113. sns.set( font_scale=1.2)
  114. g=sns.lmplot( x="PCA0", y="PCA1",
  115. data=result,
  116. fit_reg=False,
  117. hue='Cluster', # color by cluster
  118. legend=False,
  119. scatter_kws={"s": 3}, palette="Set1") # specify the point size
  120. plt.legend(title='', loc='upper left', labels=['0', '1'])
  121. plt.show()