Full-Text Indexing in Nebula Graph 2.0

bright-sky
2021-06-25

Full-Text Indexing in Nebula Graph 2.0

1. Introduction

Nebula Graph 2.0 supports full-text indexing by using an external full-text search engine. To understand this new feature, let’s review the architecture and storage model of Nebula Graph 2.0.

1.1 Architecture of Nebula Graph

Architecture of Nebula Graph Storage

As shown in the preceding figure, the Storage Service is composed of three layers. The bottom one is the Store Engine. It is a standalone local store engine, supporting get, put, scan, and delete operations on local data. The associated interfaces are in the kvstore/KVEngine.h file. Users can customize the local store plugins to meet their own needs. Currently, Nebula Graph provides a RocksDB-based store engine.

Above the local store engine, the consensus algorithm of multi-group Raft is implemented. With this implementation, each partition corresponds to one raft group, where partition is a data shard in Nebula Graph. Hash based sharding is used in Nebula Graph. For more information about how the hash functions work, see the 1.2.1 Data Storage in Nebula Graph. To create a graph space in Nebula Graph, the number of partitions is required and it cannot be changed after creation. The number of partitions must meet your needs of business expansion.

The top layer is the storage interfaces. A set of graph-related APIs are implemented in this layer. The API requests are translated into a set of KV operations on the corresponding partitions. This layer makes our storage service a real graph storage. Without it, the Storage Service of Nebula Graph is just a KV storage solution. In Nebula Graph, the KV storage is not provided as a separate service. The main reason is that a lot of computations are required to execute a WHERE clause and the schema of a graph is needed for the computations, but the schema is not implemented in the KV store layer. The design implemented in Nebula Graph makes computation pushdown easier.

1.2 Storage of Nebula Graph

In Nebula Graph 2.0, the storage structure containing vertices, edges, and indexes is improved. Now let’s review the storage structure of Nebula Graph 2.0, which could help you understand the implementation of data scanning and index scanning in Nebula Graph 2.0.

1.2.1 Data Storage in Nebula Graph

Nebula Graph stores vertices and edges based on the key-value storage model. In this section, the storage structure of the keys is introduced. The keys are composed of the following items:

  • Type: One byte. It represents the key type, such as vertex, edge, index, or system.
  • PartID: Three bytes. It represents a partition. This field makes it easy to scan the entire partition data based on the prefix when the partition is re-balanced.
  • VertexID: n bytes. For an outgoing edge, it represents the ID of the source vertex. For an incoming edge, it represents the ID of the destination vertex.
  • Edge Type: Four bytes. It represents the type of an edge. If it is greater than 0, the edge is outgoing. If it is less than 0, the edge it incoming.
  • Rank: Eight bytes. It is used to identify edges of the same edge type and with the same source and destination vertices. Users can use it to represent their own business attributes such as transaction time, transaction serial number, or a sorting weight.
  • PlaceHolder: One byte. It is invisible to users now. In the future, it will be used when we implement the distributed transaction.
  • TagID:Four bytes. It represents the type of a tag.
1.2.1.1 Vertex Key Format
Type (1 byte)PartID (3 bytes)VertexID (n bytes)TagID (4 bytes)
Edge Key Format
Type (1 byte)PartID (3 bytes)VertexID (n bytes)EdgeType (4 bytes)Rank (8 bytes)VertexID (n bytes)PlaceHolder (1 byte)

1.2.2 Index Storage in Nebula Graph

  • props binary (n bytes): (n bytes): It represents the value of a property of a tag or an edge type. If the property value is NULL, 0xFF is used.
  • nullable bitset (2 bytes): It indicates whether the value of a property is NULL. It is two bytes long, which means that an index can contain a maximum of 16 properties.
1.2.2.1 Tag Index Key Format
Type (1 byte)PartID (3 bytes)IndexID (4 bytes)props binary (n bytes)nullable bitset (2 bytes)VertexID (n bytes)
1.2.2.2 Edge Index Key Format
Type (1 byte)PartID (3 bytes)IndexID (4 bytes)props binary (n bytes)nullable bitset (2 bytes)VertexID (n bytes)Rank (8 bytes)VertexID (n bytes)

1.3 Why External Full-Text Search Engine Is Used?

From the preceding figure, you can see that if you want to perform a fuzzy query of a text on a property, a full table scan or full index scan statement is required and then the data is filtered row by row, which will compromise the query performance. If the amount of data is large, out of memory may occur before the scanning is done. Besides, inverted indexing is against the initial design principle of indexing in Nebula Graph, so it is not implemented for text search. After some research and discussion, to make the full-text search work greatly, we decided to introduce a full-text search engine from a third party. It can ensure the query performance and reduce the development cost of the Nebula Graph kernel.

2 Objective

2.1 Functionalities

In Nebula Graph 2.0, only LOOKUP supports text search. It means that when an external full-text search engine is available, users can run a LOOKUP statement to perform text search. For an external full-text search engine, only some basic functionalities, such as inserting data and querying data, are implemented. To implement some complex, plain text queries, Nebula Graph needs to be polished further. Any suggestions from the Nebula Graph community are welcome. The following are the text search expressions that are supported by Nebula Graph 2.0:

  • Fuzzy search
  • Prefix search
  • Wildcard search
  • Regular expression search

2.2 Performance

In this article, I will discuss the data synchronization performance and query performance.

  • Data synchronization performance: Because an external full-text search engine is used, it is necessary to store a copy of data in the external full-text search engine. It has been verified that the import performance of an external full-text search engine is lower than that of Nebula Graph. Therefore, in order not to decrease the data import performance of Nebula Graph, we decided to use a synchronous synchronization solution to import data to an external full-text search engine. For more information, see the following sections.
  • Query performance: As mentioned above, if no external full-text search engine were adopted, the full-text search would be a nightmare for Nebula Graph. At present, with an external full-text search engine, LOOKUP supports text search, but the performance is inevitably lower than that of the native index scan of Nebula Graph, even sometimes the query performance of the external full-text search engine is low. To solve this problem, a timeliness mechanism, LIMIT and TIMEOUT, is needed to ensure the query performance. For more information, see the following sections.

3 Glossary

TermMeaning
TagDefines the property structure of vertices. Tags are identified by tagId. Multiple tags can be attached to one vertex.
EdgeDefines the property structure of edges. Edge types are identified by edgetype.
PropertyDefines the properties of a tag or an edge type. Its data type is defined in a tag or an edge type.
PartitionRepresents the smallest logical store unit in Nebula Graph. A Storage Engine contains multiple partitions. The Leader or Follower role can be assigned to a partition. Raftex ensures the data consistency between Leaders and Followers.
Graph spaceEach graph space is an independent business graph unit. Each graph space has its own independent tag and edge type set. A Nebula Graph cluster can have multiple graph spaces.
IndexThe referred index in the following sections represents the indexes on the properties of vertices and edges in Nebula Graph. Its data type is determined by the tag or edge type definition.
TagIndexRepresents an index on a tag. A tag can have more than one index. Indexes across multiple tags have not been supported.
EdgeIndexRepresents an index on an edge type. An edge type can have more than one index. Indexes across multiple edge types have not been supported.
Scan PolicyDefines the index scan policy. Generally, a query statement can use multiple index scan policies, and Scan Policy decides which policy is used.
OptimizerOptimizes the query conditions to improve the query efficiency. For example, sorting, splitting, and merging sub-expression nodes on the expression tree of the WHERE clause.

4 Implementation

Elasticsearch is the external full-text search engine that is supported by Nebula Graph. In this section, I will introduce how Elasticsearch works with Nebula Graph 2.0.

4.1 Storage Structure

4.1.1 DocID

partId(10 bytes)schemaId(10 bytes)encoded_columnName(32 bytes)encoded_val(max 344 bytes)
  • partId: Corresponds to the partition ID of Nebula Graph. Not available in Nebula Graph 2.0. It will be used for query pushdown and the routing feature of Elasticsearch in the future.
  • schemaId: Corresponds to the tagId or edgetype in Nebula Graph.
  • encoded_columnName: Corresponds to the property name of a tag or an edge type. The MD5 algorithm is used for encoding to avoid incompatible characters in Elasticsearch docID.
  • encoded_val:The maximum length is 344 bytes. To support some visible characters in the property values that are not supported by Elasticsearch docID, the Base64 algorithm is used to encode the property values, so the maximum length of encoded_val is 344 bytes. However, its actual size is up to 256 bytes only. Why is it 256 bytes? In the beginning, we just wanted to enable LOOKUP to be used to perform text search. Similar to MySQL, the length of index in Nebula Graph is also limited and the recommended maximum length is 256 bytes. Therefore, the 256-byte length limit also is applied to the external search engine. So far, full-text search for long texts has not been supported.
  • The maximum length of Elasticsearch docID is 512 bytes. So far, about 100 bytes are reserved.

4.1.2 Doc Fields

  • schema_id: Corresponds to tagId or edgetype in Nebula Graph.
  • column_id: Corresponds to the property code of a tag or an edge type in Nebula Graph.
  • value: Corresponds to the property value of the native index in Nebula Graph.

4.2 Synchronizing Data

Leader & Listener

In this section, I will introduce the details of synchronizing data asynchronously. Understanding Leader and Listener in Nebula Graph will help you understand the synchronization mechanism.

  • Leader: Nebula Graph is a horizontally scalable distributed system and the distributed protocol is RAFT. In Nebula Graph, different roles can be assigned to a partition, such as Leader, Follower, and Learner. To write a new record to Nebula Graph, the Leader will initiate a WAL synchronization event and synchronize the event with the Followers and the Learners. When network or disk abnormalities occur, the partition role will be switched accordingly. Such a mechanism ensures the data security of the distributed database. Leaders, Followers, and Learners are controlled by the nebula-storaged process and the parameters are determined in nebula-storage.conf.
  • Listener: Unlike Leaders, Followers, and Learners, Listeners are controlled by a separate process and its configuration parameters are specified in nebula-storage-listener.conf. As a listener, a Listener passively receives the WAL sent by the Leader, parses the WAL regularly, and calls the data insertion API of the external full-text search engine to synchronize the data with the external engine. Nebula Graph 2.0 supports the PUT and BULK interfaces of Elasticsearch.

Now, let’s see how the data is synchronized:

  1. Vertices or edges are inverted via Client or Console.
  2. On the Graph Service layer, the related partition is computed based on Vertex ID.
  3. On the Graph Service layer, the INSERT request is sent to the Leader of the related partitions via storageClient.
  4. The Leader parses the INSERT request and then synchronizes the WAL with the Listener.
  5. The Listener processes the newly synchronized WAL regularly, parses the WAL, and then obtains the STRING property values of the tags or edge types.
  6. The metadata and the property values of the tags and the edge types are assembled to a data structure that are compatible with that of Elasticsearch.
  7. The data is written into Elasticsearch via the PUT or BULK interface.
  8. If writing data fails, go back to Step 5 and then try the failed WAL until the writing succeeds.
  9. When the writing succeeds, the successful Log ID and Term ID are recorded as the starting value for synchronization of the next WAL.
  10. Goes back to Step 5 to process the new WAL.

In the preceding steps, if the Elasticsearch cluster or the Listener process crashes, the synchronization of the WAL will stop. When the system is restored, the data synchronization will continue with the last successful Log ID. We recommend that DBA should monitor the state of the Elasticsearch cluster in real time by using an external monitoring tool. If the Elasticsearch cluster is inactive for a long time, a lot of logs will be generated to the Listener and the query cannot be performed normally.

4.3 Querying Data

Querying Data

From the preceding figure, we can see the key steps in text search as follows:

  1. Send Fulltext Scan Request: Generates a search request of the full-text index based on query conditions, Schema ID, and Property ID, that is, the CURL command of Elasticsearch is encapsulated.
  2. Fulltext Cluster: Sends a query request to Elasticsearch and obtains the result.
  3. Collect Constant Values: Uses the returned result as a constant value to generate an internal query expression of Nebula Graph. For example, the original request is to query the property values starting with “A” for the C1 property, and if the returned result contains both “A1” and “A2”, an expression C1 == "A1" OR C1 == "A2" is generated.
  4. IndexScan Optimizer: According to the newly generated expression, finds the optimal internal index based on RBO for Nebula Graph and generates the optimal execution plan.
  5. Fulltext Cluster: In this step, the query may be slow or massive data will be returned. Therefore, LIMIT and TIMEOUT is adopted to interrupt the query on the Elasticsearch side in real time.

5 Demonstration

5.1 Deploying External Elasticsearch Cluster

I assume that you are already familiar with the deployment of an Elasticsearch cluster, so I won’t describe it in detail. It should be noted that when the Elasticsearch cluster is successfully started, it is necessary to create a general template as follows.

{
 "template": "nebula*",
  "settings": {
    "index": {
      "number_of_shards": 3,
      "number_of_replicas": 1
    }
  },
  "mappings": {
    "properties" : {
            "tag_id" : { "type" : "long" },
            "column_id" : { "type" : "text" },
            "value" :{ "type" : "keyword"}
        }
  }
}

5.2 Deploying Nebula Listener

  • According to the actual environment, modify the configuration parameters in nebula-storaged-listener.conf
  • Run this command to start the Listener: ./bin/nebula-storaged --flagfile ${listener_config_path}/nebula-storaged-listener.conf

5.3 Signing In to Text Search Clients

nebula> SIGN IN TEXT SERVICE (127.0.0.1:9200);
nebula> SHOW TEXT SEARCH CLIENTS;
+-------------+------+
| Host        | Port |
+-------------+------+
| "127.0.0.1" | 9200 |
+-------------+------+
| "127.0.0.1" | 9200 |
+-------------+------+
| "127.0.0.1" | 9200 |
+-------------+------+

5.4 Creating a Graph Space of Nebula Graph

CREATE SPACE basketballplayer (partition_num=3,replica_factor=1, vid_type=fixed_string(30));
 
USE basketballplayer;

5.5 Adding Listeners

nebula> ADD LISTENER ELASTICSEARCH 192.168.8.5:46780,192.168.8.6:46780;
nebula> SHOW LISTENER;
+--------+-----------------+-----------------------+----------+
| PartId | Type            | Host                  | Status   |
+--------+-----------------+-----------------------+----------+
| 1      | "ELASTICSEARCH" | "[192.168.8.5:46780]" | "ONLINE" |
+--------+-----------------+-----------------------+----------+
| 2      | "ELASTICSEARCH" | "[192.168.8.5:46780]" | "ONLINE" |
+--------+-----------------+-----------------------+----------+
| 3      | "ELASTICSEARCH" | "[192.168.8.5:46780]" | "ONLINE" |
+--------+-----------------+-----------------------+----------+

5.6 Creating Tags, Edge Types, and Indexes

The name property should be shorter than 256 bytes. If the business permits, the name property of the player tag should be the fixed_string type and its length should be less than 256 bytes.

nebula> CREATE TAG player(name string, age int);
nebula> CREATE TAG INDEX name ON player(name(20));

5.7 Inserting Data

nebula> INSERT VERTEX player(name, age) VALUES \
  "Russell Westbrook": ("Russell Westbrook", 30), \
  "Chris Paul": ("Chris Paul", 33),\
  "Boris Diaw": ("Boris Diaw", 36),\
  "David West": ("David West", 38),\
  "Danny Green": ("Danny Green", 31),\
  "Tim Duncan": ("Tim Duncan", 42),\
  "James Harden": ("James Harden", 29),\
  "Tony Parker": ("Tony Parker", 36),\
  "Aron Baynes": ("Aron Baynes", 32),\
  "Ben Simmons": ("Ben Simmons", 22),\
  "Blake Griffin": ("Blake Griffin", 30);

5.8 Querying Data

nebula> LOOKUP ON player WHERE PREFIX(player.name, "B");
+-----------------+
| _vid            |
+-----------------+
| "Boris Diaw"    |
+-----------------+
| "Ben Simmons"   |
+-----------------+
| "Blake Griffin" |
+-----------------+

6 Tracking and Solving Problems

In the process of setting up the system environment, errors in a step may make the functionalities unable to work normally. Based on user feedback, I summarized three possible error types. Here is how to analyze and solve these problems:

  • Problem: The Listeners cannot be started or cannot work after startup.
    • Do a check of the Listener configuration file, making sure that the IP:Port configuration of the Listeners does not conflict with that of the existing nebula-storaged process.
    • Do a check of the Listener configuration file, making sure that the IP:Port configuration of Meta is consistent with that of nebula-storaged process.
    • Do a check of the Listener configuration file, making sure that the PIDs directory and the logs directory are independent, and that they do not conflict with that of the nebula-storaged process.
    • If the configuration is modified because of its errors after the Listeners are started successfully and the Listeners cannot work normally after restart, the Meta related metadata needs to be cleared. For more information about the commands, see Nebula Graph Database Manual.
  • Problem: The data cannot be synchronized with the Elasticsearch cluster.
    • Make sure that the Listeners have received the WAL from the Leader by checking whether there are any files in the directory specified for listener_path in the nebula-storaged-listener.conf file.
    • Open vlog by running UPDATE CONFIGS storage:v=3 and make sure that the CURL command is executed successfully. If the execution fails, do a check of the Elasticsearch configuration or the compatibility between versions.
  • Problem: There are data in the Elasticsearch cluster but no correct result is returned.
    • Open vlog by running UPDATE CONFIGS graph:v=3 and do a check of the graph logs to confirm the reasons for the CURL command failures.
    • Only lowercase characters, but not the uppercase ones, can be identified during the query. It may be caused by template errors of Elasticsearch. For more information, see Nebula Graph Database Manual.

7 TODO

  • Creating full-text indexes on specified tags or edge types.
  • Rebuilding full-text indexes (REBUILD)

Would like to know more about Nebula Graph? Join the Slack channel!