logo
Contact Us

features

Nebula Spark Connector Writer: Implementation and Practices

nebula-spark-connector-writer

In "Nebula Spark Connector Reader: Implementation and Practices", it is known that Nebula Spark Connector is a connector for Spark reading data from or writing data into NebulaGraph. It is composed of Reader and Writer. In this article, I will introduce how Nebula Spark Connector makes writing data from other data sources into NebulaGraph possible.

How Nebula Spark Connector Writer is Implemented

Spark SQL supports customizing data sources, making Spark integrating with third-party data sources possible.

Writing a single record into NebulaGraph with Nebula Spark Connector is implemented based on DatasourceV2. Here is how it is implemented:

  1. The WriteSupport interface is inherited and a new createWriter method is defined to create a custom DataSourceWriter.
  2. The DataSourceWriter interface is inherited and the NebulaDataSourceVertexWriter class and the NebulaDataSourceEdgeWriter class are created. A new createWriterFactory method is defined and a custom DataWriterFactory is returned. A new commit method is created to commit the entire transaction. A new abort method is defined for transaction rollback. Please note that NebulaGraph 1.x does not support transactions, so the commit and abort methods in this implementation will not be actually called.
  3. The DataWriterFactory is inherited and the NebulaVertexWriter class and the NebulaEdgeWriter class are created. The createWriter method is defined and a custom DataWriter is returned.
  4. The DataWriter interface is inherited and the NebulaVertexWriter class and the NebulaEdgeWriter class are created. A new write method is defined to write data out. A new commit method is created to commit the entire transaction. A new abort method is defined for transaction rollback. Please note that NebulaGraph 1.x does not support transactions, so the commit and abort methods in DataWriter will not be actually called.

The class diagram of the implementation of Nebula Spark Connector Writer is as follows.

nebula-spark-connector-writer-diagram

How to write a single record into NebulaGraph is implemented in the write method in NebulaVertexWriter and NebulaEdgeWriter:

  1. Creates a client and connects it to the Graph Service of NebulaGraph.
  2. Specifies a graph space to write data into.
  3. Constructs nGQL statements to insert data into NebulaGraph.
  4. Commits the nGQL statements and executes them.
  5. Defines callback functions to receive the execution results.

Nebula Spark Connector Writer batch writes data into NebulaGraph via cumulative commits of batch data transformed by the map operation on DataFrame, which is similar to the implementation of Nebula Exchange.

Practices of Nebula Spark Connector Writer

Nebula Spark Connector Writer provides users with two interfaces to program to write data into a third-party data source. The source of the data is DataFrame. Nebula Spark Connector Writer provides two types of interfaces to write single records and write data in batches.

Run these commands line by line to pull the source code of Nebula Spark Connector from GitHub and compile Nebula Spark Connector. Then, copy the package to the local Maven repository.

git clone -b v1.0 https://github.com/vesoft-inc/nebula-java.git
cd nebula-java
mvn clean install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true

Here is an example to introduce how to use Nebula Spark Connector Writer.

  1. Add the nebula-spark dependency into the POM file of your Maven project.
<dependency>
  <groupId>com.vesoft</groupId>
  <artifactId>nebula-spark</artifactId>
  <version>1.0.1</version>
</dependency>
  1. The Spark application uses an interface of Nebula Spark Connector Writer to write data of DataFrame into NebulaGraph.
  • 2.1 Writes a single record into NebulaGraph:
// Constructs DataFrame for vertices and edges. 
// The demo data in the nebula-java/examples/src/main/resources directory of the v1.0 branch of the nebula-java repository is used. 
// You can find the demo data in the local examples/src/main/resources directory.

val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
        edgeDF.show()

// Writes a vertex
vertexDF.write
  .nebula("127.0.0.1:3699", "nb", "100")
  .writeVertices("player", "vertexId", "hash")

// Writes an edge
edgeDF.write
    .nebula("127.0.0.1:3699", "nb", "100")
  .wirteEdges("follow", "source", "target")

Explanation of the parameters:

  • nebula(address: String, space: String, partitionNum: String)

    • address:Specifies the Graph server addresses and their ports. Multiple addresses are supported. The addresses must be separated with commas. The default port is 3699. For example, "ip1:3699,ip2:3699".
    • space: Specifies the graph space name of NebulaGraph.
    • partitionNum:Sets to the value of partitionNum that was specified during the creation of the graph space. If no value was specified, the default value 100 is used.
  • writeVertices(tag: String, vertexFiled: String, policy: String = "")

    • tag:Sets to a tag name created in the graph space.
    • vertexFiled:Specifies a column of the DataFrame as the source of the vertex IDs. For example, if the DataFrame is composed of three columns, a, b, and c, and column a is used as the source of the vertex IDs, then set this parameter to a.
    • policy:If data of the vertexField column is not of an int type, a mapping policy is necessary and set policy to "hash". Otherwise, leave it blank.
  • writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = "")

    • edge:Sets to an edge type name created in the graph space.

    • srcVertexField:Specifies a column of the DataFrame as the source of the source vertex ID of an edge.

    • dstVertexField:Specifies a column of the DataFrame as the source of the destination vertex ID of an edge.

    • policy:If data of the srcVertexField column and/or the dstVertexField column is not of an int type, a mapping policy is necessary and set policy to "hash". Otherwise, leave them blank.

      • 2.2 Batch writes data to NebulaGraph
// Constructs DataFrame for vertices and edges. 
// The demo data in the nebula-java/examples/src/main/resources directory of the v1.0 branch of the nebula-java repository is used. 
// You can find the demo data in the local examples/src/main/resources directory.

val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
        edgeDF.show()

// Batch writes vertices
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaVertex(vertexDF, "player", "vertexId")

// Batch writes edges
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaEdge(edgeDF, "follow", "source", "target")

Explanation of the parameters:

  • batchInsert(address: String, space: String, batch: Int = 2000)
    • address:Specifies the Graph server addresses and their ports. Multiple addresses are supported. The addresses must be separated with commas. The default port is 3699. For example, "ip1:3699,ip2:3699".
    • space:Specifies the graph space name of NebulaGraph.
    • batch:Sets to the number of records to be written into NebulaGraph in one batch. Optional. The default value is 2000 .
  • batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = "")
    • data:Sets to the DataFrame to be written into NebulaGraph.
    • tag:Sets to a tag name created in the graph space.
    • vertexField:Specifies a column of the DataFrame as the source of the vertex IDs.
    • policy:If data of the vertexField column is not of an int type, a mapping policy is necessary and set policy to "hash". Otherwise, leave it blank.
  • batchToNebulaEdge(data: DataFrame,  edge: String, srcVertexField: String, dstVertexField: String, rankField: String = "",  policy: String = "")
    • data:Sets to the DataFrame to be written into NebulaGraph.
    • edge:Sets to an edge type name created in the graph space.
    • srcVertexField:Specifies a column of the DataFrame as the source of the source vertex ID of an edge.
    • dstVertexField:Specifies a column of the DataFrame as the source of the destination vertex ID of an edge.
    • rankField:Specifies a column of the DataFrame as the source of the rank value of an edge. Optional.
    • policy:If data of the srcVertexField column and/or the dstVertexField column is not of an int type, a mapping policy is necessary and set policy to "hash". Otherwise, leave them blank.

So far, the introduction to Nebula Spark Connector Writer is done. You are welcome to pull the source code from the GitHub repository (https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark) and try it out.

If you encounter any problems during the trial, please feel free to ask questions and discuss in under the nebula-spark-connector repository.