Skip to content

Commit f0df70d

Browse files
authored
Merge pull request eugenp#7953 from norbertoritzmann/master
[BAEL-3086] Changes related to Apache Spark GraphX article
2 parents c7238ee + 4c50b30 commit f0df70d

File tree

5 files changed

+210
-0
lines changed

5 files changed

+210
-0
lines changed

apache-spark/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,18 @@
2727
<version>${org.apache.spark.spark-sql.version}</version>
2828
<scope>provided</scope>
2929
</dependency>
30+
<dependency>
31+
<groupId>org.apache.spark</groupId>
32+
<artifactId>spark-graphx_2.11</artifactId>
33+
<version>${org.apache.spark.spark-graphx.version}</version>
34+
<scope>provided</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>graphframes</groupId>
38+
<artifactId>graphframes</artifactId>
39+
<version>${graphframes.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
3042
<dependency>
3143
<groupId>org.apache.spark</groupId>
3244
<artifactId>spark-streaming_2.11</artifactId>
@@ -82,9 +94,17 @@
8294
<org.apache.spark.spark-sql.version>2.3.0</org.apache.spark.spark-sql.version>
8395
<org.apache.spark.spark-streaming.version>2.3.0</org.apache.spark.spark-streaming.version>
8496
<org.apache.spark.spark-mllib.version>2.3.0</org.apache.spark.spark-mllib.version>
97+
<org.apache.spark.spark-graphx.version>2.3.0</org.apache.spark.spark-graphx.version>
98+
<graphframes.version>0.7.0-spark2.4-s_2.11</graphframes.version>
8599
<org.apache.spark.spark-streaming-kafka.version>2.3.0</org.apache.spark.spark-streaming-kafka.version>
86100
<com.datastax.spark.spark-cassandra-connector.version>2.3.0</com.datastax.spark.spark-cassandra-connector.version>
87101
<com.datastax.spark.spark-cassandra-connector-java.version>1.5.2</com.datastax.spark.spark-cassandra-connector-java.version>
88102
</properties>
89103

104+
<repositories>
105+
<repository>
106+
<id>SparkPackagesRepo</id>
107+
<url>http://dl.bintray.com/spark-packages/maven</url>
108+
</repository>
109+
</repositories>
90110
</project>
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.baeldung.graphframes;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.api.java.function.VoidFunction;
6+
import org.apache.spark.graphx.Edge;
7+
import org.apache.spark.graphx.Graph;
8+
import org.apache.spark.graphx.VertexRDD;
9+
import org.graphframes.GraphFrame;
10+
import scala.Tuple2;
11+
12+
import java.io.IOException;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
public class GraphExperiments {
17+
public static Map<Long, User> USERS = new HashMap<>();
18+
19+
public static void main(String[] args) throws IOException {
20+
Logger.getLogger("org").setLevel(Level.OFF);
21+
GraphLoader loader = new GraphLoader();
22+
GraphFrame graph = loader.getGraphFrameUserRelationship();
23+
24+
GraphExperiments experiments = new GraphExperiments();
25+
experiments.doGraphFrameOperations(graph);
26+
experiments.doGraphFrameAlgorithms(graph);
27+
}
28+
29+
private void doGraphFrameOperations(GraphFrame graph) {
30+
graph.vertices().show();
31+
graph.edges().show();
32+
33+
graph.vertices().filter("name = 'Martin'").show();
34+
35+
graph.filterEdges("type = 'Friend'")
36+
.dropIsolatedVertices().vertices().show();
37+
38+
graph.degrees().show();
39+
graph.inDegrees().show();
40+
graph.outDegrees().show();
41+
}
42+
43+
private void doGraphFrameAlgorithms(GraphFrame graph) {
44+
45+
graph.pageRank().maxIter(20).resetProbability(0.15).run().vertices().show();
46+
47+
graph.connectedComponents().run().show();
48+
49+
graph.triangleCount().run().show();
50+
}
51+
52+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.baeldung.graphframes;
2+
3+
import org.apache.spark.SparkConf;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.sql.Dataset;
6+
import org.apache.spark.sql.Row;
7+
import org.apache.spark.sql.SparkSession;
8+
import org.graphframes.GraphFrame;
9+
10+
import java.io.IOException;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
16+
public class GraphLoader {
17+
18+
public JavaSparkContext getSparkContext() throws IOException {
19+
Path temp = Files.createTempDirectory("sparkGraphFrames");
20+
SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]");
21+
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
22+
javaSparkContext.setCheckpointDir(temp.toString());
23+
return javaSparkContext;
24+
}
25+
26+
public GraphFrame getGraphFrameUserRelationship() throws IOException {
27+
Path temp = Files.createTempDirectory("sparkGraphFrames");
28+
SparkSession session = SparkSession.builder()
29+
.appName("SparkGraphFrameSample")
30+
.config("spark.sql.warehouse.dir", temp.toString())
31+
.sparkContext(getSparkContext().sc())
32+
.master("local[*]")
33+
.getOrCreate();
34+
List<User> users = loadUsers();
35+
36+
Dataset<Row> userDataset = session.createDataFrame(users, User.class);
37+
38+
List<Relationship> relationshipsList = getRelations();
39+
Dataset<Row> relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class);
40+
41+
GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset);
42+
43+
return graphFrame;
44+
}
45+
46+
public List<Relationship> getRelations() {
47+
List<Relationship> relationships = new ArrayList<>();
48+
relationships.add(new Relationship("Friend", "1", "2"));
49+
relationships.add(new Relationship("Following", "1", "4"));
50+
relationships.add(new Relationship("Friend", "2", "4"));
51+
relationships.add(new Relationship("Relative", "3", "1"));
52+
relationships.add(new Relationship("Relative", "3", "4"));
53+
54+
return relationships;
55+
}
56+
57+
private List<User> loadUsers() {
58+
User john = new User(1L, "John");
59+
User martin = new User(2L, "Martin");
60+
User peter = new User(3L, "Peter");
61+
User alicia = new User(4L, "Alicia");
62+
63+
List<User> users = new ArrayList<>();
64+
65+
users.add(new User(1L, "John"));
66+
users.add(new User(2L, "Martin"));
67+
users.add(new User(3L, "Peter"));
68+
users.add(new User(4L, "Alicia"));
69+
70+
return users;
71+
}
72+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.graphframes;
2+
3+
import java.io.Serializable;
4+
import java.util.UUID;
5+
6+
public class Relationship implements Serializable {
7+
private String type;
8+
private String src;
9+
private String dst;
10+
private UUID id;
11+
12+
public Relationship(String type, String src, String dst) {
13+
this.type = type;
14+
this.src = src;
15+
this.dst = dst;
16+
this.id = UUID.randomUUID();
17+
}
18+
19+
public String getId() {
20+
return id.toString();
21+
}
22+
23+
public String getType() {
24+
return type;
25+
}
26+
27+
public String getSrc() {
28+
return src;
29+
}
30+
31+
public String getDst() {
32+
return dst;
33+
}
34+
35+
@Override
36+
public String toString() {
37+
return getSrc() + " -- " + getType() + " --> " + getDst();
38+
}
39+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.baeldung.graphframes;
2+
3+
import java.io.Serializable;
4+
5+
public class User implements Serializable {
6+
7+
private Long id;
8+
private String name;
9+
10+
public User(long id, String name) {
11+
this.id = id;
12+
this.name = name;
13+
}
14+
15+
public String getId() {
16+
return id.toString();
17+
}
18+
19+
public String getName() {
20+
return name;
21+
}
22+
23+
@Override
24+
public String toString() {
25+
return "<" + id + "," + name + ">";
26+
}
27+
}

0 commit comments

Comments
 (0)