-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhypergraph_utils.py
181 lines (162 loc) · 5.68 KB
/
hypergraph_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# --------------------------------------------------------
# Utility functions for Hypergraph
#
# Author: Yifan Feng
# Date: November 2018
# --------------------------------------------------------
import numpy as np
def Eu_dis(x):
"""
Calculate the distance among each raw of x
:param x: N X D
N: the object number
D: Dimension of the feature
:return: N X N distance matrix
"""
x = np.mat(x)
aa = np.sum(np.multiply(x, x), 1)
ab = x * x.T
dist_mat = aa + aa.T - 2 * ab
dist_mat[dist_mat < 0] = 0
dist_mat = np.sqrt(dist_mat)
dist_mat = np.maximum(dist_mat, dist_mat.T)
return dist_mat
def feature_concat(*F_list, normal_col=False):
"""
Concatenate multiple modality feature. If the dimension of a feature matrix is more than two,
the function will reduce it into two dimension(using the last dimension as the feature dimension,
the other dimension will be fused as the object dimension)
:param F_list: Feature matrix list
:param normal_col: normalize each column of the feature
:return: Fused feature matrix
"""
features = None
for f in F_list:
if f is not None and f != []:
# deal with the dimension that more than two
if len(f.shape) > 2:
f = f.reshape(-1, f.shape[-1])
# normal each column
if normal_col:
f_max = np.max(np.abs(f), axis=0)
f = f / f_max
# facing the first feature matrix appended to fused feature matrix
if features is None:
features = f
else:
features = np.hstack((features, f))
if normal_col:
features_max = np.max(np.abs(features), axis=0)
features = features / features_max
return features
def hyperedge_concat(*H_list):
"""
Concatenate hyperedge group in H_list
:param H_list: Hyperedge groups which contain two or more hypergraph incidence matrix
:return: Fused hypergraph incidence matrix
"""
H = None
for h in H_list:
if h is not None and h != []:
# for the first H appended to fused hypergraph incidence matrix
if H is None:
H = h
else:
if type(h) != list:
H = np.hstack((H, h))
else:
tmp = []
for a, b in zip(H, h):
tmp.append(np.hstack((a, b)))
H = tmp
return H
def generate_G_from_H(H, variable_weight=False):
"""
calculate G from hypgraph incidence matrix H
:param H: hypergraph incidence matrix H
:param variable_weight: whether the weight of hyperedge is variable
:return: G
"""
if type(H) != list:
return _generate_G_from_H(H, variable_weight)
else:
G = []
for sub_H in H:
G.append(generate_G_from_H(sub_H, variable_weight))
return G
def _generate_G_from_H(H, variable_weight=False):
"""
calculate G from hypgraph incidence matrix H
:param H: hypergraph incidence matrix H
:param variable_weight: whether the weight of hyperedge is variable
:return: G
"""
H = np.array(H)
n_edge = H.shape[1]
# the weight of the hyperedge
W = np.ones(n_edge)
# the degree of the node
DV = np.sum(H * W, axis=1)
# the degree of the hyperedge
DE = np.sum(H, axis=0)
invDE = np.mat(np.diag(np.power(DE, -1)))
DV2 = np.mat(np.diag(np.power(DV, -0.5)))
W = np.mat(np.diag(W))
H = np.mat(H)
HT = H.T
if variable_weight:
DV2_H = DV2 * H
invDE_HT_DV2 = invDE * HT * DV2
return DV2_H, W, invDE_HT_DV2
else:
G = DV2 * H * W * invDE * HT * DV2
return G
def construct_H_with_KNN_from_distance(dis_mat, k_neig, is_probH=False, m_prob=1):
"""
construct hypregraph incidence matrix from hypergraph node distance matrix
:param dis_mat: node distance matrix
:param k_neig: K nearest neighbor
:param is_probH: prob Vertex-Edge matrix or binary
:param m_prob: prob
:return: N_object X N_hyperedge
"""
n_obj = dis_mat.shape[0]
# construct hyperedge from the central feature space of each node
n_edge = n_obj
H = np.zeros((n_obj, n_edge))
for center_idx in range(n_obj):
dis_mat[center_idx, center_idx] = 0
dis_vec = dis_mat[center_idx]
nearest_idx = np.array(np.argsort(dis_vec)).squeeze()
avg_dis = np.average(dis_vec)
if not np.any(nearest_idx[:k_neig] == center_idx):
nearest_idx[k_neig - 1] = center_idx
for node_idx in nearest_idx[:k_neig]:
if is_probH:
H[node_idx, center_idx] = np.exp(-dis_vec[node_idx] ** 2 / (m_prob * avg_dis) ** 2)
else:
H[node_idx, center_idx] = dis_vec[node_idx]
return H
def construct_H_with_KNN(X, K_neigs=[10], split_diff_scale=False, is_probH=True, m_prob=1):
"""
init multi-scale hypergraph Vertex-Edge matrix from original node feature matrix
:param X: N_object x feature_number
:param K_neigs: the number of neighbor expansion
:param split_diff_scale: whether split hyperedge group at different neighbor scale
:param is_probH: prob Vertex-Edge matrix or binary
:param m_prob: prob
:return: N_object x N_hyperedge
"""
if len(X.shape) != 2:
X = X.reshape(-1, X.shape[-1])
if type(K_neigs) == int:
K_neigs = [K_neigs]
dis_mat = X
H = []
for k_neig in K_neigs:
H_tmp = construct_H_with_KNN_from_distance(dis_mat, k_neig, is_probH, m_prob)
if not split_diff_scale:
H = hyperedge_concat(H, H_tmp)
else:
H.append(H_tmp)
return H