logo
Contact Us

features

Nebula Spark Connector Reader: Implementation and Practices

nebula-spark-connector-reader

This post describes how to use Nebula Spark Connector to read data from NebulaGraph.

What is Nebula Spark Connector

Nebula Spark Connector is a custom Spark connector, enabling Spark to read data from and write data to NebulaGraph. Therefore, Nebula Spark Connector is composed of Reader and Writer. In this post, we will focus on Reader. Writer will be introduced next time.

How Nebula Spark Connector Reader is Implemented

Nebula Spark Connector Reader enables NebulaGraph to be an extended data source for Spark. With it, Spark can read data from Nebula into DataFrame, and then execute the operations such as map and reduce.

Spark SQL allows users to customize data sources and supports extended data sources. The data read by Spark SQL is organized in the form of named columns known as DataFrame, a distributed dataset. Spark SQL provides many APIs to facilitate the calculation and conversion of DataFrame. You can use the DataFrame interfaces to manipulate multiple types of data sources.

Spark uses org.apache.spark.sql to call packages of an extended data source. Let's first learn about the interfaces related to extended data sources provided by Spark SQL.

Basic Interfaces

Basic interfaces include:

  • BaseRelation: Indicates a collection of tuples with a known schema. All subclasses that inherit BaseRelation must generate a schema in the StructType format. In other words, BaseRelation defines the format of the data that is read out of the source and stored as DataFrame in Spark SQL.
  • RelationProvider: Obtains a list of parameters and returns a new BaseRelation based on the specified parameters.
  • DataSourceRegister: Indicates registering a data source. When using a data source, you do not need to use its fully qualified class name, but only its alias, namely the custom shortName.

Providers

Provider related interfaces include:

  • RelationProvider: Generates custom relation from the specified data source. createRelation() generates new relation based on the given parameters.
  • SchemaRelationProvider: Generates a new relation based on the given parameters and schema.

RDD

RDD related interfaces include RDD[InternalRow], which is scanned from the data source and then is constructed as RDD[Row].

To define an external data source for Spark, some of the preceding methods must be customized according to the source.

In Nebula Spark Connector, NebulaGraph is defined as an external data source of Spark SQL, and sparkSession.read is used to read data from NebulaGraph. The following class diagram shows how this function is implemented.

RDD

The process is as follows:

  1. Defining a data source NebulaRelationProvider: It inherits RelationProvider to customize relation and inherits DataSourceRegister to register an external data source.
  2. Defining NebulaRelation to implement a method to convert the schema and data of NebulaGraph. In the getSchema() method, the Meta Service of NebulaGraph is connected to obtain the schema corresponding to the returned fields configured.
  3. Defining NebulaRDD to read data from NebulaGraph. The compute() method defines how to read data from NebulaGraph, which mainly involves scanning the NebulaGraph data and converting its data rows into InternalRow data of Spark. After conversion, InternalRow forms rows of RDD. Each InternalRow represents a row of records in NebulaGraph. All data of NebulaGraph is read out and assembled into DataFrames.

Practicing Spark Connector Reader

Nebula Spark Connector Reader provides you with an interface to program to read data from NebulaGraph. One tag or edge type is read at a time. The data is read out as DataFrames.

Pull the nebula-java repository from GitHub and compile it.

git clone -b v1.0 git@github.com:vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true

After compilation, copy the JAR package from the target directory to the local MAVEN repository.

An example application is as follows:

  1. Add the nebula-spark dependency into the POM file of the MAVEN project.
<dependency>
  <groupId>com.vesoft</groupId>
  <artifactId>nebula-spark</artifactId>
  <version>1.1.0</version>
</dependency>
  1. Use the Spark application to read data from NebulaGraph.
// Read vertex data from NebulaGraph
val vertexDataset: Dataset[Row] =
      spark.read
        .nebula("127.0.0.1:45500", "spaceName", "100")
        .loadVerticesToDF("tag", "field1,field2")
vertexDataset.show()

// Read edge data from NebulaGraph
val edgeDataset: Dataset[Row] =
      spark.read
        .nebula("127.0.0.1:45500", "spaceName", "100")
        .loadEdgesToDF("edge", "field1,field2")
edgeDataset.show()

Here is the explanation of the variables:

  • nebula(address: String, space: String, partitionNum: String)
address:Specifies the IP addresses of the Meta service. Multiple IP addresses are separated with commas. For example, "ip1:45500, ip2:45500".
space: Specifies a graph space name of NebulaGraph.
partitionNum:Specifies the number of Spark partitions. To make sure that each Spark partition has the data of a partition of the specified graph space, we recommend that you set this variable to the partitionNum that you used to create the graph space. 
  • loadVertices(tag: String, fields: String)
tag:Specifies a tag of the specified graph space.
fields:Specifies the properties of the given tags. Multiple properties are separated with commas. To read all the properties of the tag, set it to *.
  • loadEdges(edge: String, fields: String)
edge:Specifies an edge type of the specified graph space.
fields:Specifies the properties of the given edge types. Multiple properties are separated with commas. To read all the properties of the edge type, set it to *.

GitHub repository

Here is the repository of Nebula Spark Connector Reader on GitHub: https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark

So many thanks to Bywin for their contribution to the Nebula Spark Connector Reader in Java.

References

[1] Extending Spark Datasource API: write a custom spark datasource [2] spark external datasource source code