Features
Offline CSV Import of Billions of Data | Best Practice Using NebulaGraph Spark Connector
Based on the actual demand and subsequent scalability, the NebulaGraph database is used for technical selection. The batch import performance in the actual business scenarios of NebulaGraph needs to be first verified.
This article shares the best practice of using NebulaGraph Spark Connector by performing import jobs via Spark on Yarn distributed tasks and with CSV files on HDFS as the data source.
About NebulaGraph Spark Connector
The following figure shows the use cases and advantages of NebulaGraph Spark Connector. For more details, see NebulaGraph Spark Connector.
Environment Information
- Hardware environment
Name | Value | Recommend |
---|---|---|
Local Disk (SSD) | 2 T | At least 2 T |
CPU | 16 C * 4 | 128 C |
Memory | 128 GB | 128 GB |
- Software environment
Name | Version |
---|---|
NebulaGraph | 3.0.0 |
NebulaGraph Spark Connector | 3.0.0 |
Hadoop | 2.7.2U17-10 |
Spark | 2.4.5U5 |
- Data size
Name | Value |
---|---|
Data size | 200 GB |
Vertex | 930 million |
Edge | 970 million |
Deployment Plan
- Method: Distributed, 3 nodes
- See Deploy a NebulaGraph cluster with RPM/DEB package on multiple servers
There are roughly three steps for the deployment.
- Download the RPM package of NebulaGraph and install it.
- Modify configuration files in batch.
- Start the cluster service.
The above-mentioned three steps are performed with the root
user. For non-root users, use sudo
to execute the commands.
Download and Install the NebulaGraph RPM Package
Execute the following commands:
wget https://os-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm
wget https://oss-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm.sha256sum.txt
rpm -ivh nebula-graph-3.0.0.el7.x86_64.rpm
Note: The default installation path is /usr/local/nebula
. Make sure the memory resources are sufficient.
Modify Configuration Files in Batch
sed -i 's?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.1x.x.15:9559,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=127.0.0.1?--local_ip=172.1x.1x.149?g' *.conf
sed -i 's?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.1x.x.15:9559,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=127.0.0.1?--local_ip=172.1x.x.15?g' *.conf
sed -i 's?--meta_server_addrs=127.0.0.1:9559?--meta_server_addrs=172.1x.x.15:9559,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=127.0.0.1?--local_ip=172.1x.x.176?g' *.conf
Note: The IP addresses in the above commands are internal IP addresses, used for communication between nodes.
Start the NebulaGraph Cluster Service
/usr/local/nebula/scripts/nebula.service start all
The above command is to start the cluster service, the following is to check the status of the cluster service.
ps aux|grep nebula
The returned result shows 3 processes for each service.
/usr/local/nebula/bin/nebula-metad --flagfile /usr/local/nebula/etc/nebula-metad.conf
/usr/local/nebula/bin/nebula-graphd --flagfile /usr/local/nebula/etc/nebula-graphd.conf
/usr/local/nebula/bin/nebula-storaged --flagfile /usr/local/nebula/etc/nebula-storaged.conf
Note: If the returned result shows less than 3 processes, run /usr/local/nebula/scripts/nebula.service start all
several times or run /usr/local/nebula/scripts/nebula.service restart all
to restart the service.
Visualization Service
The NebulaGraph Studio is used as the visualize the NebulaGraph cluster service. Visit http://n01v:7001
to log into the NebulaGraph Studio. (This is my network environment which is not accessible from the Internet.)
- Login: 10.x.x.1 (The IP of any node):9669
- Username/Password:
root
/nebula
Refer to Use nGQL (CRUD) for common-used nGQL statements.
Use NebulaGraph
Add the Storage nodes to the cluster.
ADD HOSTS 172.x.x.121:9779, 172.16.11.218:9779,172.16.12.12:9779;
List all nodes and check whether the status of the cluster is ONLINE by running
SHOW HOSTSor
SHOW HOSTS META`.
Create a graph space, equivalent to a database in relational databases.
CREATE SPACE mylove (partition_num = 15, replica_factor = 3, vid_type = FIXED_STRING(256));//The number of partitions is recommended to be 5 times the number of nodes; the number of replicas is an odd number, generally set to 3; if the data type of vids is string, make the length as long as a reasonable value, otherwise, it takes up too much disk space.
Create a Tag, equivalent to an entity.:
CREATE TAG entity (name string NULL, version string NULL);
Create an edge, equivalent to a relationship.
CREATE EDGE relation (name string NULL);
Add limit
when querying data, otherwise, the query may take a long time.
match (v) return v limit 100;
Read and Import CSV Files with Spark Connector
References:
- NebulaSparkWriterExample (Scala Json) from the official
- NebulaSparkWriterExample (Java Json) from an ace
Code example of NebulaSparkWriterExample
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.{
NebulaConnectionConfig,
WriteMode,
WriteNebulaEdgeConfig,
WriteNebulaVertexConfig
}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkWriter {
private val LOG = LoggerFactory.getLogger(this.getClass)
var ip = ""
def main(args: Array[String]): Unit = {
val part = args(0)
ip = args(1)
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
if("1".equalsIgnoreCase(part)) writeVertex(spark)
if("2".equalsIgnoreCase(part)) writeEdge(spark)
spark.close()
}
def getNebulaConnectionConfig(): NebulaConnectionConfig = {
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress(ip + ":9559")
.withGraphAddress(ip + ":9669")
.withTimeout(Integer.MAX_VALUE)
.withConenctionRetry(5)
.build()
config
}
def writeVertex(spark: SparkSession): Unit = {
LOG.info("start to write nebula vertices: 1 entity")
val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/tag/entity/").toDF("id", "name", "version")
val config = getNebulaConnectionConfig()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("mywtt")
.withTag("entity")
.withVidField("id")
.withVidAsProp(false)
.withUser("root")
.withPasswd("nebula")
.withBatch(1800)
.build()
df.coalesce(1400).write.nebula(config, nebulaWriteVertexConfig).writeVertices()
}
def writeEdge(spark: SparkSession): Unit = {
LOG.info("start to write nebula edges: 2 entityRel")
val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/out/rel/relation/").toDF("src", "dst", "name")
val config = getNebulaConnectionConfig()
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("mywtt")
.withEdge("relation")
.withSrcIdField("src")
.withDstIdField("dst")
.withSrcAsProperty(false)
.withDstAsProperty(false)
.withUser("root")
.withPasswd("nebula")
.withBatch(1800)
.build()
df.coalesce(1400).write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
}
}
Details on NebulaSparkWriterExample
The following are the details of the functions.
spark.sparkContext.setLogLevel("WARN")
: Sets the log print level and prevents the INFO log.withTimeout(Integer.MAX_VALUE)
: Sets the connection timeout. The timeout can be set as large as possible. The default value is 1 minute. When the number of timeouts is greater than that of retries, the Spark task fails.option("sep", "\t")
: Specifies the separator of CSV files, otherwise, it defaults to 1 column.toDF("src", "dst", "name")
: The schema of the dataset, i.e., convertingDataset<Row>
toDataFrame
, otherwise, theVidField
cannot be specified.withVidField("id")
: This function only works when the column name is set, so a schema must be specified.withVidAsProp(false)
: When the ID is set as the VID by default, data will not be written repeatedly as a property that will not take up disk space.withSrcIdField("src")
: Sets theIdField
of the source vertex.withDstIdField("dst")
: Sets theIdField
of the destination vertex.withSrcAsProperty(false)
: Saves disk space.withDstAsProperty(false)
: Saves disk space.withBatch(1000)
: The number of rows of data imported in batch.WriteMode.UPDATE
is <=512 by default. The value ofWriteMode.INSERT
can be larger (Gigabit NIC/Bandwidth 5Gbps /Local SSD = 1500)coalesce(1500)
: Adjustable according to the number of concurrent tasks. Too much data on a single partition can easily cause executor OOM exceptions.
Submit Tasks to the Spark Cluster
nohup spark-submit --master yarn --deploy-mode client --class com.xxx.nebula.connector.NebulaSparkWriter --conf spark.dynamicAllocation.enabled=false --conf spark.executor.memoryOverhead=10g --conf spark.blacklist.enabled=false --conf spark.default.parallelism=1000 --driver-memory 10G --executor-memory 12G --executor-cores 4 --num-executors 180 ./example-3.0-SNAPSHOT.jar > run-csv-nebula.log 2>&1 &
Monitoring with iotop
The iotop
command can be used to monitor the disk IO of the Spark cluster.
Total DISK READ : 26.61 K/s | Total DISK WRITE : 383.77 M/s
Actual DISK READ: 26.61 K/s | Actual DISK WRITE: 431.75 M/s
Monitoring with top
The top
command can be used to monitor the CPU and memory of the Spark cluster.
top - 16:03:01 up 8 days, 28 min, 1 user, load average: 6.16, 6.53, 4.58
Tasks: 205 total, 1 running, 204 sleeping, 0 stopped, 0 zombie
%Cpu(s): 28.3 us, 14.2 sy, 0.0 ni, 56.0 id, 0.6 wa, 0.0 hi, 0.4 si, 0.5 st
KiB Mem : 13186284+total, 1135004 free, 31321240 used, 99406592 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 99641296 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
27979 root 20 0 39.071g 0.026t 9936 S 564.6 20.8 83:22.03 nebula-storaged
27920 root 20 0 2187476 804036 7672 S 128.2 0.6 17:13.75 nebula-graphd
27875 root 20 0 6484644 1.990g 8588 S 58.5 1.6 14:14.22 nebula-metad
Other Resource Monitoring
Service Optimization
Configuration Optimization of nebula-storaged.conf
Here, I modified the value of parameters in the nebula-storaged.conf
file.
# Default reserved bytes for a batch operation.
--rocksdb_batch_size=4096
# Default block cache size used in BlockBasedTable.
# The unit is MB. It is usually set to one third of the memory value. The memory is 128 GB.
--rocksdb_block_cache=44024
############## rocksdb Options ##############
--rocksdb_disable_wal=true
# In JSON, the name and value of each option of rocksdb DBOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
--rocksdb_db_options={"max_subcompactions":"3","max_background_jobs":"3"}
# In JSON, the name and value of each option of rocksdb ColumnFamilyOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
--rocksdb_column_family_options={"disable_auto_compactions":"false","write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}
# In JSON, the name and value of each option of rocksdb BlockBasedTableOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
--rocksdb_block_based_table_options={"block_size":"8192"}
# The number of processors per request.
--max_handlers_per_req=10
# The interval time between clusters.
--heartbeat_interval_secs=10
--raft_rpc_timeout_ms=5000
--raft_heartbeat_interval_secs=10
--wal_ttl=14400
# The max value for batch import.
--max_batch_size=1800
# Reduces memory usage when the value is true.
--enable_partitioned_index_filter=true
# The data is indirectly filtered at the bottom storage layer to prevent encountering the super node in the production environment.
--max_edge_returned_per_vertex=10000
Optimizations of the Linux System
ulimit -c unlimited
ulimit -n 130000
sysctl -w net.ipv4.tcp_slow_start_after_idle=0
sysctl -w net.core.somaxconn=2048
sysctl -w net.ipv4.tcp_max_syn_backlog=2048
sysctl -w net.core.netdev_max_backlog=3000
sysctl -w kernel.core_uses_pid=1
Verify Import Results
SUBMIT JOB STATS;
SHOW JOB ${ID}
SHOW STATS;
The entity insertion rate is approximately 27,837 items/s (for this import performance calculation only).
The relationship insertion rate is about 26,276 items/s (for this import performance calculation only).
The import performance will be better if the server is better configured. In addition, bandwidth, cross-datacenter or not, disk IO, and even network fluctuations are also factors that affect performance.
Performance Tests
- Query for the specified entities based on properties.
MATCH (v:entity) WHERE v.entity.name == 'Lifespan' RETURN v;
It takes 0.002558 (s).
- One-hop queries.
MATCH (v1:entity)-[e:propertiesRel]->(v2:attribute) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN v2 limit 100;
It takes 0.003571 (s)
- Two-hop queries.
MATCH p=(v1:entity)-[e:propertiesRel*1..2]->(v2) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN p;
It takes 0.005143 (s)
- Query for all the properties of an edge.
FETCH PROP ON propertiesRel '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' -> '0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256' YIELD properties(edge);
It takes 0.001304 (s)
match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1]->(v2) return p;
å It takes 0.02986 (s)
match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*2]->(v2) return p;
It takes 0.07937 (s)
match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*3]->(v2) return p;
It takes 0.269 (s)
match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*4]->(v2) return p;
It takes 3.524859 (s)
```nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..2]->(v2) return p;
It takes 0.072367 (s)
nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..3]->(v2) return p;
It takes 0.279011 (s)
nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..4]->(v2) return p; It takes 3.728018 (s)
- Query for the properties of vertices and the edge in the shortest path in bilateral directions between `A_vid` and `B_vid`.
nebula FIND SHORTEST PATH WITH PROP FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * BIDIRECT YIELD path AS p;
It takes 0.003096 (s)
nebula FIND ALL PATH FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * WHERE propertiesRel.name is not EMPTY or propertiesRel.name >=0 YIELD path AS p;
It takes 0.003656 (s)
# Problems Encountered
## Guava version conflict
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
After troubleshooting, I found that one of the dependent modules used Guava version 22.0, while the Spark cluster came with 14.0, resulting in a conflict, and it did not work properly. For tasks running on Spark clusters, Spark loads Guava packages with higher priority than its packages.
The package we depend on uses methods that are newer in Guava version 22.0 and not available in version 14.0. Without being able to modify each other's code, the following options are available:
1. The Spark cluster package can be upgraded, which is risky and can cause unknown problems.
2. Rename your Guava package using the Maven plugin.
The second way is used here, using the Maven plugin shade (link: https://maven.apache.org/plugins/maven-shade-plugin/) to rename the package to solve the problem.
java
## Spark Blacklist Mechanism
Blacklisting behavior can be configured via spark.blacklist.*.
``
The default value of
spark.blacklist.enabledis
false. If the value is set to
true, Spark will no longer schedule tasks to the blacklisted executors. The blacklisting algorithm can be further configured by setting
spark.blacklist`.
*This article was written by Enqiu Jia and first published on the WeChat Official Account *只要5分钟不要996*. This article is translated and published with permission.