Tools
Importing data into NebulaGraph using Nebula Importer
NebulaGraph is now a mature product with many ecosystem tools. It offers a wide range of options in terms of data import. There is the large and comprehensive Nebula Exchange, the small and compact Nebula Importer, and the Nebula Spark Connector and Nebula Flink Connector for Spark and Flink integrations.
But which of the many import methods is more convenient?
Here are my takes:
Nebula Exchange
- If you need to import streaming data from Kafka and Pulsar into the NebulaGraph database
- If you need to read batch data from relational databases (e.g. MySQL) or distributed file systems (e.g. HDFS)
- If you need to generate SST files recognized by NebulaGraph from large batches of data
Nebula Importer
- Nebula Importer is best for importing local CSV files into NebulaGraph
Nebula Spark Connector:
- Migrate data between different NebulaGraph clusters
- Migrate data between different graph spaces within the same NebulaGraph cluster
- Migrate data between NebulaGraph and other data sources
- Combining Nebula Algorithm for graph computation
For more options about how to import data from Spack, read: 4 different ways to work with NebulaGraph in Apache Spark
Nebula Flink Connector
- Migrate data between different NebulaGraph clusters
- Migrate data between different graph spaces within the same NebulaGraph cluster
- Migrate data between NebulaGraph and other data sources
Overall, Nebula Exchange is large and comprehensive, and can be combined with most storage engines to import into Nebula, but requires a Spark environment to be deployed.
Nebula Importer is simple to use and requires fewer dependencies, but you need to generate your own data file in advance and configure the schema once and for all, but it does not support breakpoint transfer and is suitable for medium data volume.
Spark / Flink Connector needs to be combined with stream batch data.
Choose different tools for different scenarios. For newcomers to Nebula, it is recommended to use Nebula Importer, a data import tool, because it is easy to use and quick to get started.
Using Nebula Importer
When we first came across NebulaGraph, because the ecology was not perfect, and only some businesses migrated to Nebula, we used to import NebulaGraph data, whether full or incremental, by pushing Hive tables to Kafka and consuming Kafka to write NebulaGraph in batch. Later, as more and more data and businesses switched to NebulaGraph, the problem of importing data efficiency became more and more serious. The increase in import time made it unacceptable to still be importing data at full volume during peak business hours.
For the above problems, after trying Nebula Spark Connector and Nebula Importer, we decided to use _Hive table → CSV → Nebula Server → Nebula Importer_ to import the full amount of data for the sake of easy maintenance and migration, and the overall time spent was significantly reduced. The overall time consumption is significantly reduced.
Configuring Nebula Importer
System environment
[root@nebula-server-prod-05 importer]# lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2499.998
BogoMIPS: 4999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Disk:SSD
Memory: 128G
Cluster Environment
NebulaGraph Version: v2.6.1
Deployment Method: RPM
Cluster size: 3 replicas, 6 nodes
Data Size
+---------+--------------------------+-----------+
| "Space" | "vertices" | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges" | 722490436 |
+---------+--------------------------+-----------+
Nebula Importer configuration
# Graph version, set to v2 when connecting 2.x.
version: v2
description: Relation Space import data
# Whether to remove temporarily generated log and error data files.
removeTempFiles: false
clientSettings:
# The number of retries for failed nGQL statement execution.
retry: 3
# Number of concurrency for NebulaGraph clients.
concurrency: 5
# The size of the cache queue for each NebulaGraph client.
channelBufferSize: 1024
# The NebulaGraph graph space to import data into.
space: Relation
# Connection information.
connection:
user: root
password: ******
address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669
postStart:
# Configure some actions to be performed before inserting data after connecting to the NebulaGraph server.
commands: |
# The interval between the execution of the above commands and the execution of the insert data command.
afterPeriod: 1s
preStop:
# Configure some actions to be performed before disconnecting from the NebulaGraph server.
commands: |
# The path to the file where log messages such as errors will be output.
logPath: /mnt/csv_file/prod_relation/err/test.log
....
Set up the Crontab, Hive generates the tables and transfers them to the NebulaGraph Server, running Nebula Importer tasks at night when traffic is low:
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log
In total, it took 2 hours to complete the import of the full amount of data at 6 am.
Some of the logs are as follows, and the import speed is maintained at a maximum of about 200000/s
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)
Then at 7:00, Kafka is re-consumed to import the incremental data from the early morning of the day to 7:00 based on the timestamp, preventing the full amount of t+1 data from overwriting the incremental data of the day.
The incremental consumption takes about 10-15 min.
Real-time
The incremental data obtained from the MD5 comparison is imported into Kafka, and Kafka data is consumed in real-time to ensure that the data delay is no more than 1 minute.
In addition, there may be unanticipated data issues that are not detected in real-time for a long time, so the full amount of data is imported every 30 days, which is the Nebula Importer import described above. Then add a_ TTL=35 days _to the point side of Space to ensure that any data not updated in time will be filtered and subsequently recycled.
About the author
Reid is an engineer at Qichacha, China’s biggest corporate information platform.