Nebula Spark Connector Reader: Implementation and Practices
This post describes how to use Nebula Spark Connector to read data from Nebula Graph.
What is Nebula Spark Connector
Nebula Spark Connector is a custom Spark connector, enabling Spark to read data from and write data to Nebula Graph. Therefore, Nebula Spark Connector is composed of Reader and Writer. In this post, we will focus on Reader. Writer will be introduced next time.
How Nebula Spark Connector Reader is Implemented
Nebula Spark Connector Reader enables Nebula Graph to be an extended data source for Spark. With it, Spark can read data from Nebula into DataFrame, and then execute the operations such as map and reduce.
Spark SQL allows users to customize data sources and supports extended data sources. The data read by Spark SQL is organized in the form of named columns known as DataFrame, a distributed dataset. Spark SQL provides many APIs to facilitate the calculation and conversion of DataFrame. You can use the DataFrame interfaces to manipulate multiple types of data sources.
org.apache.spark.sql to call packages of an extended data source. Let’s first learn about the interfaces related to extended data sources provided by Spark SQL.
Basic interfaces include:
BaseRelation: Indicates a collection of tuples with a known schema. All subclasses that inherit
BaseRelationmust generate a schema in the
StructTypeformat. In other words,
BaseRelationdefines the format of the data that is read out of the source and stored as DataFrame in Spark SQL.
RelationProvider: Obtains a list of parameters and returns a new
BaseRelationbased on the specified parameters.
DataSourceRegister: Indicates registering a data source. When using a data source, you do not need to use its fully qualified class name, but only its alias, namely the custom
Provider related interfaces include:
RelationProvider: Generates custom
relationfrom the specified data source.
relationbased on the given parameters.
SchemaRelationProvider: Generates a new
relationbased on the given parameters and schema.
RDD related interfaces include
RDD[InternalRow], which is scanned from the data source and then is constructed as
To define an external data source for Spark, some of the preceding methods must be customized according to the source.
In Nebula Spark Connector, Nebula Graph is defined as an external data source of Spark SQL, and
sparkSession.read is used to read data from Nebula Graph. The following class diagram shows how this function is implemented.
The process is as follows:
- Defining a data source
NebulaRelationProvider: It inherits
DataSourceRegisterto register an external data source.
NebulaRelationto implement a method to convert the schema and data of Nebula Graph. In the
getSchema()method, the Meta Service of Nebula Graph is connected to obtain the schema corresponding to the returned fields configured.
NebulaRDDto read data from Nebula Graph. The
compute()method defines how to read data from Nebula Graph, which mainly involves scanning the Nebula Graph data and converting its data rows into
InternalRowdata of Spark. After conversion,
InternalRowforms rows of RDD. Each
InternalRowrepresents a row of records in Nebula Graph. All data of Nebula Graph is read out and assembled into DataFrames.
Practicing Spark Connector Reader
Nebula Spark Connector Reader provides you with an interface to program to read data from Nebula Graph. One tag or edge type is read at a time. The data is read out as DataFrames.
Pull the nebula-java repository from GitHub and compile it.
git clone -b v1.0 firstname.lastname@example.org:vesoft-inc/nebula-java.git cd nebula-java/tools/nebula-spark mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
After compilation, copy the JAR package from the
target directory to the local MAVEN repository.
An example application is as follows:
- Add the
nebula-sparkdependency into the POM file of the MAVEN project.
<dependency> <groupId>com.vesoft</groupId> <artifactId>nebula-spark</artifactId> <version>1.1.0</version> </dependency>
- Use the Spark application to read data from Nebula Graph.
// Read vertex data from Nebula Graph val vertexDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadVerticesToDF("tag", "field1,field2") vertexDataset.show() // Read edge data from Nebula Graph val edgeDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadEdgesToDF("edge", "field1,field2") edgeDataset.show()
Here is the explanation of the variables:
nebula(address: String, space: String, partitionNum: String)
address：Specifies the IP addresses of the Meta service. Multiple IP addresses are separated with commas. For example, "ip1:45500, ip2:45500". space: Specifies a graph space name of Nebula Graph. partitionNum：Specifies the number of Spark partitions. To make sure that each Spark partition has the data of a partition of the specified graph space, we recommend that you set this variable to the partitionNum that you used to create the graph space.
loadVertices(tag: String, fields: String)
tag：Specifies a tag of the specified graph space. fields：Specifies the properties of the given tags. Multiple properties are separated with commas. To read all the properties of the tag, set it to *.
loadEdges(edge: String, fields: String)
edge：Specifies an edge type of the specified graph space. fields：Specifies the properties of the given edge types. Multiple properties are separated with commas. To read all the properties of the edge type, set it to *.
Here is the repository of Nebula Spark Connector Reader on GitHub: https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark
So many thanks to Bywin for their contribution to the Nebula Spark Connector Reader in Java.