logo
Contact Enterprise Sales

Source-code

NebulaGraph Source Code Explained: fbthrift

NebulaGraph Source Code Explained: fbthrift

Overview

Nebula Clients provide users with APIs in multiple programming languages to interact with NebulaGraph and repackages the data structure returned by the server for better use.

Currently, Nebula Clients support C++, Java, Python, Golang, and Rust.

Framework for service communication

Nebula Clients use fbthrift https://github.com/facebook/fbthrift as the RPC framework for service communication between servers and clients to implement cross-language interaction.

At a high level, fbthrift is:

  1. A code generator: fbthrift has a code generator that generates data structures that can be serialized using Thrift in different languages.
  2. A serialization framework: fbthrift has a set of protocols to serialize the generated structures created from the code generator.
  3. An RPC framework: fbthrift has a framework to send messages between clients and servers and to call application-defined functions when receiving messages in different languages.

Examples

Take the Golang client as an example to show the application of fbthrift in NebulaGraph.

  1. The definition of the Vertex structure in servers:
struct Vertex {
    Value vid;
    std::vector<Tag> tags;

    Vertex() = default;
};
  1. Define some data structures in src/interface/common.thrift:
struct Tag {
        1: binary name,
        // List of <prop_name, prop_value>
        2: map<binary, Value> (cpp.template = "std::unordered_map") props,
} (cpp.type = "nebula::Tag")

struct Vertex {
        1: Value     vid,
        2: list<Tag> tags,
} (cpp.type = "nebula::Vertex")

In the above example, we define a Vertex structure. (cpp.type = "nebula::Vertex") indicates this structure corresponds to the nebula::Vertex of the server.

  1. fbthrift will automatically generate the data structure in Golang:
// Attributes:
//  - Vid
//  - Tags
type Vertex struct {
    Vid *Value `thrift:"vid,1" db:"vid" json:"vid"`
    Tags []*Tag `thrift:"tags,2" db:"tags" json:"tags"`
}

func NewVertex() *Vertex {
    return &Vertex{}
}

...

func (p *Vertex) Read(iprot thrift.Protocol) error { // Deserialization 
    ...
}

func (p *Vertex) Write(oprot thrift.Protocol) error { // Serialization 
    ...
}
  1. In MATCH (v:Person) WHERE id(v) == "ABC" RETURN v, the client requests a vertex (nebula::Vertex) from the server. The server will serialize it after finding it. After the server finds this vertex, it will be serialized and sent to the client through the transport of the RPC communication framework. When the client receives this data, it will be deserialized to generate the corresponding data structure (type Vertex struct) defined in the client.

Clients

In this section, we will take nebula-go as an example to introduce different modules of the client and their main interfaces.

  1. Configs provides the whole configuration options.
type PoolConfig struct {
    // Set the timeout threshold. The default value 0 means it does not time out. Unit: ms
    TimeOut time.Duration
    // The maximum idle time of each connection. When the idle time exceeds this threshold, the connection will be disconnected and deleted. The default value 0 means permanently idle and the connection will not be disconnected
    IdleTime time.Duration
    // max_connection_pool_size: Set the maximum number of connections in the connection pool. The default value is 10
    MaxConnPoolSize int
    // The minimum number of idle connections. The default value is 0
    MinConnPoolSize int
}
  1. Session provides an interface for users to call directly.
// Manage the specific information of Session
type Session struct {
    // Use for identity verification or message retry when executing commands
    sessionID  int64
    // Currently held connections
    connection *connection
    // Currently used connection pools
    connPool   *ConnectionPool
    // Log tools
    log        Logger
    // Use to save the time zone used by the current session
    timezoneInfo
}
  • The definition of interfaces is as follows:
    // Execute nGQL. The return data type is ResultSet. This interface is non-thread-safe
    func (session *Session) Execute(stmt string) (*ResultSet, error) {...}
    // Re-acquire a connection from the connection pool for the current Session
    func (session *Session) reConnect() error {...}
    // Signout, release the Session ID, and return the connection to the pool
    func (session *Session) Release() {
  1. ConnectionPool manages all connections. The main interfaces are as follows:
// Create a new connection pool and complete the initialization with the entered service address
func NewConnectionPool(addresses []HostAddress, conf PoolConfig, log Logger) (*ConnectionPool, error) {...}
// Validate and get the Session example
func (pool *ConnectionPool) GetSession(username, password string) (*Session, error) {...}
  1. Connection packages the network of thrift and provides the following interfaces:
// Establish a connection with the specified ip and port
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration) error {...}
// Authenticate the username and password
func (cn *connection) authenticate(username, password string) (*graph.AuthResponse, error) {
// Execute query
func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {...}
// Generate a temp sessionID 0 and send the query "YIELD 1" to test if the connection is usable.
func (cn *connection) ping() bool {...}
// Release sessionId to the graphd process.
func (cn *connection) signOut(sessionID int64) error {...}
// Disconnect.
func (cn *connection) close() {...}
  1. LoadBalance is used in the connection pool.
    • Policy: Polling

Interaction of modules

Interaction of modules

  1. Connection pool
    • Initialize:
      • When using it, the user needs to create and initialize a connection pool. During initialization, the connection pool will establish a connection at the address of the Nebula service specified by the user. If multiple Graph services are deployed in a cluster deployment method, the connection pool will use a polling policy to balance the load and establish a nearly equal number of connections for each address.
    • Manage connections:
      • Two queues are maintained in the connection pool, idle connection queue and active Connection Queue. The connection pool will periodically detect expired idle connections and close them. These two queues will use read-write lock to ensure the correctness of multi-thread execution when adding or deleting elements.
      • When Session requests a connection to the connection pool, it will check whether there are usable connections in the idle connection queue. If there are any usable connections, they will be directly returned to the Session for users to use. If there are no usable connections and the current total number of connections does not exceed the maximum number of connections defined in the configuration, a new connection is created to the Session. If it reaches the maximum number of connections, an error is returned.
    • Generally, the connection pool needs to be closed only when you close the program. All connections in the pool will be disconnected when the program is closed.
  2. Session
    • Session is generated through the connection pool. The user needs to provide the password for authentication. After the authentication succeeds, the user will get a Session example and communicate with the server through the connection in the Session. The most commonly used interface is execute(). If an error occurs during execution, the client will check the error type. If it is a network error, it will automatically reconnect and try to execute the statement again.
    • Note that a Session does not support being used by multiple threads at the same time. The correct way is that multiple sessions are applied by multiple threads, and one session is used by each thread.
    • When the Session is released, the connection held by it will be put back into the idle connection queue of the connection pool so that it can be reused by other sessions later.
  3. Connection
    • Each connection example is equivalent and can be held by any Session. The purpose of this design is to allow these connections to be reused by different Sessions, reducing repeatedly enabling and disabling Transport.
    • The connection will send the client's request to the server and return the result to the Session.
  4. Example
// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
if err != nil {
    log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
// Close all connections in the pool when program exits
defer pool.Close()

// Create session
session, err := pool.GetSession(username, password)
if err != nil {
    log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
        username, password, err.Error()))
}
// Release session and return connection back to connection pool when program exits
defer session.Release()

// Excute a query
resultSet, err := session.Execute(query)
if err != nil {
    fmt.Print(err.Error())
}

Returned data structure

The client packages the returned query results by part of the complex servers and adds an interface for convenience use.

Basic type of returned results Packaged type
Null
Bool
Int64
Double
String
Time TimeWrapper
Date
DateTime DateTimeWrapper
List
Set
Map
Vertex Node
Edge Relationship
Path PathWrraper
DateSet ResultSet
- Record (For row operations of ResultSet)

nebula::Value will be packaged as ValueWrapper in the client and converted to other structures through interfaces. (i.g. node = ValueWrapper.asNode())

Analysis of data structure

For MATCH p= (v:player{name:"Tim Duncan"})-[]->(v2) RETURN p, the returned result is:

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| p                                                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| <("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"})<-[:teammate@0 {end_year: 2016, start_year: 2002}]-("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 11550/12009 us)

We can see that the returned result contains one row, and its type is a path. At this time, you can execute as follows to get the properties of the destination vertex of the path (v2).

// Excute a query
resultSet, _ := session.Execute("MATCH p= (v:player{name:"\"Tim Duncan"\"})-[]->(v2) RETURN p")

// Get the first row of the result. The index of the first row is 0
record, err := resultSet.GetRowValuesByIndex(0)
if err != nil {
    t.Fatalf(err.Error())
}

// Take the value of the cell in the first column from the first row
// At this time, the type of valInCol0 is ValueWrapper
valInCol0, err := record.GetValueByIndex(0)

// Convert ValueWrapper into PathWrapper objects.
pathWrap, err = valInCol0.AsPath()

// Get the destination vertex through pathWrap.GetEndNode()
node, err = pathWrap.GetEndNode()

// Get all properties through node.Properties()
// The type of props is map[string]*ValueWrapper
props, err = node.Properties()

Address of clients

The GitHub addresses of clients are as follows:

If you encounter any problems in the process of using NebulaGraph, please refer to NebulaGraph Database Manual to troubleshoot the problem. It records in detail the knowledge points and specific usage of the graph database and the graph database NebulaGraph.

Join our Slack channel if you want to discuss with the rest of the NebulaGraph community!