fdc.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # --[ Known to be used ]----
  2. import numpy as np
  3. from numba import jit
  4. import umap.umap_ as umap
  5. # --[ Known to be used but can we avoid it? ]----
  6. import pandas as pd
  7. from fdc.visualize import plotMapping
  8. def value(v, defaultValue):
  9. if v is None:
  10. return defaultValue
  11. else:
  12. return v
  13. def feature_clustering(UMAP_neb, min_dist_UMAP, metric, data, visual=False):
  14. data_embedded = Clustering(metric, UMAP_neb, min_dist_UMAP).fit(data)
  15. result = pd.DataFrame(data=data_embedded, columns=['UMAP_0', 'UMAP_1'])
  16. if visual:
  17. plotMapping(result)
  18. return result
  19. @jit(nopython=True)
  20. def canberra_modified(a,b):
  21. return np.sqrt(np.sum(np.array([np.abs(1.0 - x) / (1.0 + np.abs(x)) for x in (np.abs(a-b) + 1.0)])))
  22. class Clustering:
  23. def __init__(self, metric='euclidian', UMAP_neb=30, min_dist_UMAP=0.1):
  24. self.metric = metric
  25. self.UMAP_neb = UMAP_neb
  26. self.min_dist_UMAP = min_dist_UMAP
  27. def fit(self, data):
  28. def normalize(x):
  29. return (x - np.mean(x)) / np.std(x)
  30. np.random.seed(42)
  31. data_embedded = umap.UMAP(
  32. n_neighbors=self.UMAP_neb
  33. , min_dist=self.min_dist_UMAP
  34. , n_components=2
  35. , metric=self.metric
  36. , random_state=42
  37. ).fit_transform(data)
  38. data_embedded[:, 0] = normalize(data_embedded[:, 0])
  39. data_embedded[:, 1] = normalize(data_embedded[:, 1])
  40. return data_embedded
  41. class FDC:
  42. def __init__(self, clustering_cont=None, clustering_ord=None, clustering_nom=None, drop_nominal=True, visual=False, with_2d_embedding=False, use_pandas_output=False):
  43. # used clusterings
  44. self.clustering_cont = clustering_cont or Clustering('euclidian', 30, 0.1)
  45. self.clustering_ord = clustering_ord or Clustering(canberra_modified, 30, 0.1)
  46. self.clustering_nom = clustering_nom or Clustering('hamming', 30, 0.1)
  47. # Control of data output
  48. self.use_pandas_output = use_pandas_output
  49. self.with_2d_embedding = with_2d_embedding
  50. self.drop_nominal = drop_nominal
  51. # Control if a graph is shown
  52. self.visual = visual
  53. # Lists to select columns for continueous, nominal and ordinal data.
  54. self.cont_list = None
  55. self.nom_list = None
  56. self.ord_list = None
  57. def calc_embedding(self, clustering, data, column_list):
  58. if column_list is not None:
  59. return clustering.fit(data[column_list])
  60. else:
  61. return None
  62. def normalize(self, data, cont_list=None, nom_list=None, ord_list=None, with_2d_embedding=False, visual=None):
  63. np.random.seed(42)
  64. visual = value(visual, self.visual)
  65. concat_column_names = []
  66. concat_lists = []
  67. # Reducing continueous features into 2dim
  68. cont_emb = self.calc_embedding(self.clustering_cont, data, value(cont_list, self.cont_list))
  69. if cont_emb is not None:
  70. concat_lists.append(cont_emb)
  71. concat_column_names.extend(['CONT_UMAP_0', 'CONT_UMAP_1'])
  72. # Reducing ordinal features into 2dim
  73. ord_emb = self.calc_embedding(self.clustering_ord, data, value(ord_list, self.ord_list))
  74. if ord_emb is not None:
  75. concat_lists.append(ord_emb)
  76. concat_column_names.extend(['ORD_UMAP_0', 'ORD_UMAP_1'])
  77. # Reducing nominal features into 2dim
  78. nom_emb = self.calc_embedding(self.clustering_nom, data, value(nom_list, self.nom_list))
  79. if nom_emb is not None:
  80. concat_column_names.append('NOM_UMAP_0')
  81. if self.drop_nominal:
  82. nom_emb = nom_emb[:, 0].reshape((nom_emb.shape[0], 1))
  83. else:
  84. concat_column_names.append('NOM_UMAP_1')
  85. concat_lists.append(nom_emb)
  86. # Merge results
  87. if concat_lists == []:
  88. raise ValueError("Expected at least one non empty column list.")
  89. result_concat = np.concatenate(concat_lists, axis=1)
  90. # Create 2d embedding
  91. if with_2d_embedding or visual:
  92. result_reduced = umap.UMAP(
  93. n_neighbors=30
  94. , min_dist=0.001
  95. , n_components=2
  96. , metric='euclidean'
  97. , random_state=42
  98. ).fit_transform(result_concat) #reducing 5D embeddings to 2D using UMAP
  99. if self.use_pandas_output:
  100. result_reduced = pd.DataFrame(data=result_reduced, columns=['UMAP_0', 'UMAP_1'])
  101. # Show mapping if needed
  102. if visual:
  103. if self.use_pandas_output:
  104. plotMapping(result_reduced)
  105. else:
  106. plotMapping(pd.DataFrame(data=result_reduced, columns=['UMAP_0', 'UMAP_1']))
  107. # Return the results
  108. if self.use_pandas_output:
  109. result_concat = pd.DataFrame(data=result_concat, columns=concat_column_names)
  110. if with_2d_embedding:
  111. return result_concat, result_reduced #returns both 5D and 2D embeddings
  112. else:
  113. return result_concat #returns 5D embedding only