from library.exercise import Exercise from library.dataset import DataSet, TrainTestData from library.generators import ProWRAS, SimpleGan, Repeater, SpheredNoise, ConvGAN, StupidToyListGan, CtGAN, CtabGan import pickle import numpy as np import time import random import csv import gzip import sys import os from imblearn.datasets import fetch_datasets def loadDataset(datasetName): def isSame(xs, ys): for (x, y) in zip(xs, ys): if x != y: return False return True def isIn(ys): def f(x): for y in ys: if isSame(x,y): return True return False return f def isNotIn(ys): def f(x): for y in ys: if isSame(x,y): return False return True return f print(f"Load '{datasetName}'") if datasetName.startswith("data_input/imblearn_"): print("from imblearn") ds = fetch_datasets() myData = ds[datasetName[20:]] ds = None features = myData["data"] labels = myData["target"] elif datasetName.startswith("data_input/kaggle_"): features = [] labels = [] c = csv.reader(gzip.open(f"{datasetName}.csv.gz", "rt")) for (n, row) in enumerate(c): # Skip heading if n > 0: features.append([float(x) for x in row[:-1]]) labels.append(int(row[-1])) features = np.array(features) labels = np.array(labels) else: print("from pickle file") pickle_in = open(f"{datasetName}.pickle", "rb") pickle_dict = pickle.load(pickle_in) myData = pickle_dict["folding"] k = myData[0] labels = np.concatenate((k[1], k[3]), axis=0).astype(float) features = np.concatenate((k[0], k[2]), axis=0).astype(float) label_1 = list(np.where(labels == 1)[0]) label_0 = list(np.where(labels != 1)[0]) features_1 = features[label_1] features_0 = features[label_0] cut = np.array(list(filter(isIn(features_0), features_1))) if len(cut) > 0: print(f"non empty cut in {datasetName}! ({len(cut)} points)") # print(f"{len(features_0)}/{len(features_1)} point before") # features_0 = np.array(list(filter(isNotIn(cut), features_0))) # features_1 = np.array(list(filter(isNotIn(cut), features_1))) # print(f"{len(features_0)}/{len(features_1)} points after") ds = DataSet(data0=features_0, data1=features_1) print("Data loaded.") return ds def getRandGen(initValue, incValue=257, multValue=101, modulus=65537): value = initValue while True: value = ((multValue * value) + incValue) % modulus yield value def genShuffler(): randGen = getRandGen(2021) def shuffler(data): data = list(data) size = len(data) shuffled = [] while size > 0: p = next(randGen) % size size -= 1 shuffled.append(data[p]) data = data[0:p] + data[(p + 1):] return np.array(shuffled) return shuffler def showTime(t): s = int(t) m = s // 60 h = m // 60 d = h // 24 s = s % 60 m = m % 60 h = h % 24 if d > 0: return f"{d} days {h:02d}:{m:02d}:{s:02d}" else: return f"{h:02d}:{m:02d}:{s:02d}" def mkDirIfNotExists(name): try: os.mkdir(name) except FileExistsError as e: pass def runExercise(datasetName, resultList, ganName, ganCreator, skipIfCsvExists=True): print(f"* Running {ganName} on {datasetName}") oldStdOut = sys.stdout oldStdErr = sys.stderr resultsFileName = f"data_result/{ganName}" # Prepare Folder for result data mkDirIfNotExists("data_result") mkDirIfNotExists(resultsFileName) resultsFileName += f"/{datasetName}" try: os.stat(f"{resultsFileName}.csv") if skipIfCsvExists and resultList is None: print(" Resultfile exists => skip calculation.") return except FileNotFoundError as e: pass sys.stdout = open(resultsFileName + ".log", "w") sys.stderr = sys.stdout twStart = time.time() tpStart = time.process_time() print() print() print("///////////////////////////////////////////") print(f"// Running {ganName} on {datasetName}") print("///////////////////////////////////////////") print() data = loadDataset(f"data_input/{datasetName}") gan = ganCreator(data) random.seed(2021) shuffler = genShuffler() exercise = Exercise(shuffleFunction=shuffler, numOfShuffles=5, numOfSlices=5) avg = exercise.run(gan, data, resultsFileName=resultsFileName) tpEnd = time.process_time() twEnd = time.time() if resultList is not None: resultList[datasetName] = avg sys.stdout = oldStdOut sys.stderr = oldStdErr print(f" wall time: {showTime(twEnd - twStart)}s, process time: {showTime(tpEnd - tpStart)}") def runExerciseForSimpleGAN(datasetName, resultList=None): runExercise(datasetName, resultList, "SimpleGAN", lambda data: SimpleGan(numOfFeatures=data.data0.shape[1])) def runExerciseForRepeater(datasetName, resultList=None): runExercise(datasetName, resultList, "Repeater", lambda _data: Repeater()) def runExerciseForSpheredNoise(datasetName, resultList=None): runExercise(datasetName, resultList, "SpheredNoise", lambda _data: SpheredNoise()) def runExerciseForCtGAN(datasetName, resultList=None, debug=False): runExercise(datasetName, resultList, "ctGAN", lambda data: CtGAN(data.data0.shape[1], debug=debug)) def runExerciseForConvGAN(datasetName, resultList=None, neb=5, debug=False): runExercise(datasetName, resultList, "convGAN", lambda data: ConvGAN(data.data0.shape[1], neb=neb, gen=neb, debug=debug)) def runExerciseForConvGANfull(datasetName, resultList=None, debug=False): runExercise(datasetName, resultList, "convGAN-full", lambda data: ConvGAN(data.data0.shape[1], neb=data.data0.shape[1], gen=data.data0.shape[1], debug=debug)) def runSpeedTestForConvGan(datasetName, ganGenerator): ganName = "convGAN" print() print() print("///////////////////////////////////////////") print(f"// Running speed test for {ganName} on {datasetName}") print("///////////////////////////////////////////") print() d = [] t1 = time.time() data = loadDataset(f"data_input/{datasetName}") gan = ganGenerator(data.data0.shape[1]) random.seed(2021) shuffler = genShuffler() exercise = Exercise(shuffleFunction=shuffler, numOfShuffles=3, numOfSlices=3) exercise.debug = (lambda _x: None) t2 = time.time() exercise.run(gan, data) t3 = time.time() d = (t3 - t1, t2 - t1, t3 - t2) print(f"Total Time: {d[0]}") print(f"Preparation Time: {d[1]}") print(f"Test Time: {d[2]}") return d, gan testSets = [ "folding_abalone_17_vs_7_8_9_10", "folding_abalone9-18", "folding_car_good", "folding_car-vgood", "folding_flare-F", "folding_hypothyroid", "folding_kddcup-guess_passwd_vs_satan", "folding_kr-vs-k-three_vs_eleven", "folding_kr-vs-k-zero-one_vs_draw", "folding_shuttle-2_vs_5", "folding_winequality-red-4", "folding_yeast4", "folding_yeast5", "folding_yeast6", "imblearn_webpage", "imblearn_mammography", "imblearn_protein_homo", "imblearn_ozone_level", "kaggle_creditcard" ] def runAllTestSets(dataSetList): for dataset in testSets: runExerciseForRepeater(dataset) runExerciseForSpheredNoise(dataset) runExerciseForSimpleGAN(dataset) runExerciseForConvGAN(dataset) runExerciseForConvGANfull(dataset) generators = { "ProWRAS": lambda _data: ProWRAS() , "Repeater": lambda _data: Repeater() #, "SpheredNoise": lambda _data: SpheredNoise() , "SimpleGAN": lambda data: SimpleGan(numOfFeatures=data.data0.shape[1]) , "ctGAN": lambda data: CtGAN(data.data0.shape[1]) , "CTAB-GAN": lambda _data: CtabGan() , "convGAN-old-5": lambda data: ConvGAN(data.data0.shape[1], neb=5, gen=5) , "convGAN-old-full": lambda data: ConvGAN(data.data0.shape[1], neb=data.data0.shape[1], gen=data.data0.shape[1]) , "convGAN-majority-5": lambda data: ConvGAN(data.data0.shape[1], neb=5, gen=5) , "convGAN-majority-full": lambda data: ConvGAN(data.data0.shape[1], neb=None) , "convGAN-proximary-5": lambda data: ConvGAN(data.data0.shape[1], neb=5, gen=5, withMajorhoodNbSearch=True) , "convGAN-proxymary-full": lambda data: ConvGAN(data.data0.shape[1], neb=None, withMajorhoodNbSearch=True) }