Features
Nebula Spark Connector Reader: Implementation and Practices
This post describes how to use Nebula Spark Connector to read data from NebulaGraph.
What is Nebula Spark Connector
Nebula Spark Connector is a custom Spark connector, enabling Spark to read data from and write data to NebulaGraph. Therefore, Nebula Spark Connector is composed of Reader and Writer. In this post, we will focus on Reader. Writer will be introduced next time.
How Nebula Spark Connector Reader is Implemented
Nebula Spark Connector Reader enables NebulaGraph to be an extended data source for Spark. With it, Spark can read data from Nebula into DataFrame, and then execute the operations such as map and reduce.
Spark SQL allows users to customize data sources and supports extended data sources. The data read by Spark SQL is organized in the form of named columns known as DataFrame, a distributed dataset. Spark SQL provides many APIs to facilitate the calculation and conversion of DataFrame. You can use the DataFrame interfaces to manipulate multiple types of data sources.
Spark uses org.apache.spark.sql
to call packages of an extended data source. Let's first learn about the interfaces related to extended data sources provided by Spark SQL.
Basic Interfaces
Basic interfaces include:
BaseRelation
: Indicates a collection of tuples with a known schema. All subclasses that inheritBaseRelation
must generate a schema in theStructType
format. In other words,BaseRelation
defines the format of the data that is read out of the source and stored as DataFrame in Spark SQL.RelationProvider
: Obtains a list of parameters and returns a newBaseRelation
based on the specified parameters.DataSourceRegister
: Indicates registering a data source. When using a data source, you do not need to use its fully qualified class name, but only its alias, namely the customshortName
.
Providers
Provider related interfaces include:
RelationProvider
: Generates customrelation
from the specified data source.createRelation()
generates newrelation
based on the given parameters.SchemaRelationProvider
: Generates a newrelation
based on the given parameters and schema.
RDD
RDD related interfaces include RDD[InternalRow]
, which is scanned from the data source and then is constructed as RDD[Row]
.
To define an external data source for Spark, some of the preceding methods must be customized according to the source.
In Nebula Spark Connector, NebulaGraph is defined as an external data source of Spark SQL, and sparkSession.read
is used to read data from NebulaGraph. The following class diagram shows how this function is implemented.
The process is as follows:
- Defining a data source
NebulaRelationProvider
: It inheritsRelationProvider
to customizerelation
and inheritsDataSourceRegister
to register an external data source. - Defining
NebulaRelation
to implement a method to convert the schema and data of NebulaGraph. In thegetSchema()
method, the Meta Service of NebulaGraph is connected to obtain the schema corresponding to the returned fields configured. - Defining
NebulaRDD
to read data from NebulaGraph. Thecompute()
method defines how to read data from NebulaGraph, which mainly involves scanning the NebulaGraph data and converting its data rows intoInternalRow
data of Spark. After conversion,InternalRow
forms rows of RDD. EachInternalRow
represents a row of records in NebulaGraph. All data of NebulaGraph is read out and assembled into DataFrames.
Practicing Spark Connector Reader
Nebula Spark Connector Reader provides you with an interface to program to read data from NebulaGraph. One tag or edge type is read at a time. The data is read out as DataFrames.
Pull the nebula-java repository from GitHub and compile it.
git clone -b v1.0 git@github.com:vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
After compilation, copy the JAR package from the target
directory to the local MAVEN repository.
An example application is as follows:
- Add the
nebula-spark
dependency into the POM file of the MAVEN project.
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark</artifactId>
<version>1.1.0</version>
</dependency>
- Use the Spark application to read data from NebulaGraph.
// Read vertex data from NebulaGraph
val vertexDataset: Dataset[Row] =
spark.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadVerticesToDF("tag", "field1,field2")
vertexDataset.show()
// Read edge data from NebulaGraph
val edgeDataset: Dataset[Row] =
spark.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadEdgesToDF("edge", "field1,field2")
edgeDataset.show()
Here is the explanation of the variables:
nebula(address: String, space: String, partitionNum: String)
address:Specifies the IP addresses of the Meta service. Multiple IP addresses are separated with commas. For example, "ip1:45500, ip2:45500".
space: Specifies a graph space name of NebulaGraph.
partitionNum:Specifies the number of Spark partitions. To make sure that each Spark partition has the data of a partition of the specified graph space, we recommend that you set this variable to the partitionNum that you used to create the graph space.
loadVertices(tag: String, fields: String)
tag:Specifies a tag of the specified graph space.
fields:Specifies the properties of the given tags. Multiple properties are separated with commas. To read all the properties of the tag, set it to *.
loadEdges(edge: String, fields: String)
edge:Specifies an edge type of the specified graph space.
fields:Specifies the properties of the given edge types. Multiple properties are separated with commas. To read all the properties of the edge type, set it to *.
GitHub repository
Here is the repository of Nebula Spark Connector Reader on GitHub: https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark
So many thanks to Bywin for their contribution to the Nebula Spark Connector Reader in Java.
References
[1] Extending Spark Datasource API: write a custom spark datasource [2] spark external datasource source code