@@ -15,7 +15,7 @@ import scala.concurrent.duration.durationToPair
15
15
*/
16
16
17
17
case class Vertex (id : Int , degree: Int )
18
- case class Edge (vertex1: Vertex ,vertex2: Vertex , var truss : Int = 1 , var triangleCount : Int = - 1 )
18
+ case class Edge (vertex1: Vertex ,vertex2: Vertex , var triangleCount : Int = - 1 )
19
19
case class Triangle (edge1: Edge , edge2: Edge , edge3: Edge )
20
20
21
21
@@ -26,23 +26,21 @@ object Truss {
26
26
val splitted = line.split(seperator)
27
27
val f = new Vertex (splitted(0 ).toInt, 1 )
28
28
val s = new Vertex (splitted(1 ).toInt, 1 )
29
- createEdge(f, s, 1 )
29
+ createEdge(f, s)
30
30
}
31
31
).name(" convert Graph" )
32
32
convertedGraph
33
33
}
34
34
35
- def createEdge (vert1: Vertex , vert2: Vertex , truss : Int ): Edge = {
36
- if (vert1.degree < vert2.degree) new Edge (vert1, vert2, truss )
35
+ def createEdge (vert1: Vertex , vert2: Vertex ): Edge = {
36
+ if (vert1.degree < vert2.degree) new Edge (vert1, vert2)
37
37
else
38
38
if (vert1.degree == vert2.degree && vert1.id < vert2.id)
39
- new Edge (vert1, vert2, truss )
40
- else new Edge (vert2, vert1, truss )
39
+ new Edge (vert1, vert2)
40
+ else new Edge (vert2, vert1)
41
41
}
42
42
43
43
def addDegrees (graph: DataSet [Edge ]): DataSet [Edge ] = {
44
- val truss = 1
45
-
46
44
val degrees = graph
47
45
.flatMap{e => List (e.vertex1, e.vertex2)}
48
46
.groupBy(0 )
@@ -52,10 +50,10 @@ object Truss {
52
50
val degreedGraph = graph
53
51
.map(e => (e.vertex1.id, e.vertex2.id, e))
54
52
.join(degrees, JoinHint .BROADCAST_HASH_SECOND ).where(0 ).equalTo(0 ) { // Repartition
55
- (e, v) => (e._2, new Edge (v._2, e._3.vertex2, truss ))
53
+ (e, v) => (e._2, new Edge (v._2, e._3.vertex2))
56
54
}
57
55
.join(degrees, JoinHint .BROADCAST_HASH_SECOND ).where(0 ).equalTo(0 ) {
58
- (e, v) => createEdge(e._2.vertex1, v._2, truss )
56
+ (e, v) => createEdge(e._2.vertex1, v._2)
59
57
}.name(" join Degrees" )
60
58
61
59
degreedGraph
@@ -75,7 +73,7 @@ object Truss {
75
73
val allEdges = graph.map(edge => (edge, edge)).name(" triangleCalculation: map singlge edges" )
76
74
77
75
// TODO Parameterization of #nodes?
78
- val joinStrategy = if (k >= 8 ) JoinHint .BROADCAST_HASH_SECOND else JoinHint .REPARTITION_HASH_SECOND
76
+ val joinStrategy = if (k >= 8 ) JoinHint .BROADCAST_HASH_SECOND else JoinHint .BROADCAST_HASH_SECOND // REPARTITION_HASH_SECOND
79
77
80
78
val triangles = triads.join(allEdges, joinStrategy).where(0 ).equalTo(0 ) {
81
79
(triadPart, edgePart) => Triangle (edgePart._2, triadPart._2, triadPart._3)
@@ -85,7 +83,7 @@ object Truss {
85
83
}
86
84
87
85
def getOuterTriangleVertices (edge1: Edge , edge2: Edge ): Edge = {
88
- createEdge(edge1.vertex2, edge2.vertex2, edge1.truss )
86
+ createEdge(edge1.vertex2, edge2.vertex2)
89
87
}
90
88
91
89
@@ -99,6 +97,7 @@ object Truss {
99
97
val triangles = getTriangles(filteredGraph, k)
100
98
101
99
val filteredTriangles = triangles.iterateWithTermination(Int .MaxValue )({triangles =>
100
+
102
101
val singleEdges = triangles.flatMap(triangle => List (triangle.edge1, triangle.edge2, triangle.edge3)).map((_, 1 )).name(" prepare triangle count per edge" )
103
102
104
103
val triangleCountPerEdge = singleEdges.groupBy(0 ).reduce{
@@ -108,10 +107,11 @@ object Truss {
108
107
edgeInt._1.triangleCount = edgeInt._2
109
108
edgeInt._1
110
109
}.name(" give edge triangle count" )
111
- .filter(edge => edge.triangleCount >= k- 2 ).name(" filter edge with too small triangleCount" )
112
110
113
111
val removableEdges = graph.filter(edge => edge.triangleCount < k- 2 )
114
112
113
+ graph = graph.filter(edge => edge.triangleCount >= k- 2 ).name(" filter edge with too small triangleCount" )
114
+
115
115
var joinedtriangles = triangles.join(graph).where({triangle => (triangle.edge1.vertex1, triangle.edge1.vertex2)}).equalTo(" vertex1" , " vertex2" ){
116
116
(triangle, edge) =>
117
117
triangle.edge1.triangleCount = edge.triangleCount
0 commit comments