logo
Contact Us

use-cases

Import data from Neo4j to NebulaGraph via Nebula Exchange: Best Practices

Import data from Neo4j to NebulaGraph

The requirement for real-time data updating and querying increases continuously as the data size is surging. Under such case, Neo4j is sure to fall behind. Since the single-host Neo4j Community version doesn't support scale up, the linear scalability and read/write splitting is not possible. Besides, there are data size limitation for the community version, too. To make things worse, there is the leader node writing performance bottleneck for the causal cluster provided by the Neo4j Enterprise version.

Compared with Neo4j, NebulaGraph features in single leader host writing and read/write scalability owing to its shared-nothing distributed architecture. In fact, NebulaGraph is doing a perfect job with super large data contains hundreds of billions vertices and edges.

In this article, I will show you how to import data from Neo4j to NebulaGraph with the official ETL tool, Nebula Exchange. I also cover the import related problems and solutions in this article.

Deployment Environment

Hardware configuration:

  • CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
  • CPU Cores:40
  • Memory Size:376 GB
  • Disk:HDD
  • System:CentOS Linux release 7.4.1708 (Core)

Software information:

  • Neo4j: Version 3.4, five nodes causal cluster
  • NebulaGraph:
    • Version: NebulaGraph v1.1.0, compiled by the source code
    • Deployment: three nodes cluster deployed on one host
  • Exchange:nebula-java v1.1.0
  • Data warehouse:
    • hadoop-2.7.4
    • spark-2.3.1

NOTE: Ports allocation when deploying multiple nodes on one host: each Storage Service adds 1 to the customized port number and calls it internally.

Full and Incremental Import

Full Import

Before importing data, create schema in NebulaGraph. The schema is based on the original Neo4j data. Note that you can create schema based on your actual business need. So it's likely some properties are NULL. Confirm all the properties with your co-workers in case you leave any properties behind. The schema in NebulaGraph is similar to that in MySQL, you can create and alter properties for the schema. All the tag and edge meta data are the same.

  1. Create tag and edge in NebulaGraph
# Create a graph space with 10 partitions and 3 replicas.
CREATE SPACE test(partition_num=10,replica_factor=3);
# Use the graph space
USE test;
# Create tag tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# Create tag tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# Create edge type edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);
  1. Modify the configuration file for the Exchange
  • At this time, Exchange doesn't support connecting to Neo4j with bolt+routing. If you're using causal cluster, choose a node to read data by using the bolt method to release the cluster load.
  • Our vid in Neo4j is stored as the string type. However, NebulaGraph v1.x doesn't support string vid, so we use uuid() to gender vids.
  • partition: the partition number Exchange pulls data from Neo4j.
  • batch: the batch size when importing data into NebulaGraph
{
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # NebulaGraph relation config
  nebula: {
    address:{
      graph:["xxx.xxx.xxx.xx:3699"]
      meta:["xxx.xxx.xxx.xx:45500"]
    }
    user: user
    pswd: password
    space: test

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing tags
  tags: [
    # Loading tag from neo4j
    {
      name: tagA
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
      fields: [vid, field-a0, field-a1, field-a2]
      nebula.fields: [vid, field-a0, field-a1, field-a2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
    # Loading tag from neo4j
    {
      name: tagB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
      fields: [vid, field-b0, field-b1, field-b2]
      nebula.fields: [vid, field-b0, field-b1, field-b2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]

  # Processing edges
  edges: [
   # Loading edges from neo4j
    {
      name: edgeAB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
      fields: [vid, field-e0, field-e1, field-e2]
      nebula.fields: [vid, field-e0, field-e1, field-e2]
      source: {
        field: a.vid
        policy: "uuid"
      }
      target: {
        field: b.vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]
}
  1. Run the import command
nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &
  1. Confirm the imported data
./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB

NOTE: NebulaGraph 1.x only supports count data with db_dump. In NebulaGraph 2.0, you can use the nGQL statement to count data.

Incremental Import

Incremental import is done through splitting the self-incremental id() for the Neo4j data. When importing the exec item, you need to add id() range limitation when running Neo4j Cypher statements. But before this step, stop the data deletion operation. This is because when doing incremental import, if you delete data, Neo4j uses the id() repeatedly, which will cause data loss. If you support writing data from both Neo4j and NebulaGraph, no such problem is to be worried.

exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"

Import Related Issues

Issue one: Exchange does not support the escape of special characters such as line break with enter. For example, the string data contains enter, so the insertion operation is failed.

Neo4j 导入 NebulaGraph 的实践总结

PR:https://github.com/vesoft-inc/nebula-java/pull/203 is merged to the exchange v1.0 branch.

Issue two: Exchange doesn't supports importing NULL data type. As mentioned previously, you can add properties to some tags or edge types. Other properties that are ignored is NULL at this time. So Exchange throws an error when importing.

Modify the com.vesoft.nebula.tools.importer.processor.Processor#extraValue configuration, add converted value for the NULL type.

case NullType => {
  fieldTypeMap(field) match {
    case StringType => ""
    case IntegerType => 0
    case LongType => 0L
    case DoubleType => 0.0
    case BooleanType => false
  }
}

Optimize Import Efficiency

  1. Improve importing efficiency by appropriately increasing the partition number and the batch value.
  2. If your vid is the string type, use the hash() function. In NebulaGraph 2.0, string type vid is supported.
  3. The official suggests modify the spark-submit commit command to yarn-cluster. If you don't use yarn, configure to spark://ip:port. We use the spark-submit --master "local[16]" method to increase spark concurrency. The import rate is raised by 4+ times. In our test (three nodes on one host), the max IO reaches 200-300 MB/s. But there are hadoop cache issues when setting --master "local[16]". You can fix this by add the HDFS configuration fs.hdfs.impl.disable.cache=true.

Conclusion

Although I have encountered some problems when importing, the community has provided fast helps. Many thanks to the community and NebulaGraph staffs. Hope the openCypher compatible NebulaGraph 2.0 come soon.

References

  1. https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
  2. https://github.com/vesoft-inc/nebula-java/tree/v1.0
  3. https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
  4. http://arganzheng.life/hadoop-filesystem-closed-exception.html