Source-code
NebulaGraph Source Code Explained: fbthrift
Overview
Nebula Clients provide users with APIs in multiple programming languages to interact with NebulaGraph and repackages the data structure returned by the server for better use.
Currently, Nebula Clients support C++, Java, Python, Golang, and Rust.
Framework for service communication
Nebula Clients use fbthrift
https://github.com/facebook/fbthrift as the RPC framework for service communication between servers and clients to implement cross-language interaction.
At a high level, fbthrift is:
- A code generator: fbthrift has a code generator that generates data structures that can be serialized using Thrift in different languages.
- A serialization framework: fbthrift has a set of protocols to serialize the generated structures created from the code generator.
- An RPC framework: fbthrift has a framework to send messages between clients and servers and to call application-defined functions when receiving messages in different languages.
Examples
Take the Golang client as an example to show the application of fbthrift in NebulaGraph.
- The definition of the
Vertex
structure in servers:
struct Vertex {
Value vid;
std::vector<Tag> tags;
Vertex() = default;
};
- Define some data structures in
src/interface/common.thrift
:
struct Tag {
1: binary name,
// List of <prop_name, prop_value>
2: map<binary, Value> (cpp.template = "std::unordered_map") props,
} (cpp.type = "nebula::Tag")
struct Vertex {
1: Value vid,
2: list<Tag> tags,
} (cpp.type = "nebula::Vertex")
In the above example, we define a Vertex structure. (cpp.type = "nebula::Vertex")
indicates this structure corresponds to the nebula::Vertex
of the server.
- fbthrift will automatically generate the data structure in Golang:
// Attributes:
// - Vid
// - Tags
type Vertex struct {
Vid *Value `thrift:"vid,1" db:"vid" json:"vid"`
Tags []*Tag `thrift:"tags,2" db:"tags" json:"tags"`
}
func NewVertex() *Vertex {
return &Vertex{}
}
...
func (p *Vertex) Read(iprot thrift.Protocol) error { // Deserialization
...
}
func (p *Vertex) Write(oprot thrift.Protocol) error { // Serialization
...
}
- In
MATCH (v:Person) WHERE id(v) == "ABC" RETURN v
, the client requests a vertex (nebula::Vertex
) from the server. The server will serialize it after finding it. After the server finds this vertex, it will be serialized and sent to the client through thetransport
of the RPC communication framework. When the client receives this data, it will be deserialized to generate the corresponding data structure (type Vertex struct
) defined in the client.
Clients
In this section, we will take nebula-go as an example to introduce different modules of the client and their main interfaces.
- Configs provides the whole configuration options.
type PoolConfig struct {
// Set the timeout threshold. The default value 0 means it does not time out. Unit: ms
TimeOut time.Duration
// The maximum idle time of each connection. When the idle time exceeds this threshold, the connection will be disconnected and deleted. The default value 0 means permanently idle and the connection will not be disconnected
IdleTime time.Duration
// max_connection_pool_size: Set the maximum number of connections in the connection pool. The default value is 10
MaxConnPoolSize int
// The minimum number of idle connections. The default value is 0
MinConnPoolSize int
}
- Session provides an interface for users to call directly.
// Manage the specific information of Session
type Session struct {
// Use for identity verification or message retry when executing commands
sessionID int64
// Currently held connections
connection *connection
// Currently used connection pools
connPool *ConnectionPool
// Log tools
log Logger
// Use to save the time zone used by the current session
timezoneInfo
}
- The definition of interfaces is as follows:
// Execute nGQL. The return data type is ResultSet. This interface is non-thread-safe
func (session *Session) Execute(stmt string) (*ResultSet, error) {...}
// Re-acquire a connection from the connection pool for the current Session
func (session *Session) reConnect() error {...}
// Signout, release the Session ID, and return the connection to the pool
func (session *Session) Release() {
- ConnectionPool manages all connections. The main interfaces are as follows:
// Create a new connection pool and complete the initialization with the entered service address
func NewConnectionPool(addresses []HostAddress, conf PoolConfig, log Logger) (*ConnectionPool, error) {...}
// Validate and get the Session example
func (pool *ConnectionPool) GetSession(username, password string) (*Session, error) {...}
- Connection packages the network of thrift and provides the following interfaces:
// Establish a connection with the specified ip and port
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration) error {...}
// Authenticate the username and password
func (cn *connection) authenticate(username, password string) (*graph.AuthResponse, error) {
// Execute query
func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {...}
// Generate a temp sessionID 0 and send the query "YIELD 1" to test if the connection is usable.
func (cn *connection) ping() bool {...}
// Release sessionId to the graphd process.
func (cn *connection) signOut(sessionID int64) error {...}
// Disconnect.
func (cn *connection) close() {...}
- LoadBalance is used in the connection pool.
- Policy: Polling
Interaction of modules
- Connection pool
- Initialize:
- When using it, the user needs to create and initialize a connection pool. During initialization, the connection pool will establish a connection at the address of the Nebula service specified by the user. If multiple Graph services are deployed in a cluster deployment method, the connection pool will use a polling policy to balance the load and establish a nearly equal number of connections for each address.
- Manage connections:
- Two queues are maintained in the connection pool, idle connection queue and active Connection Queue. The connection pool will periodically detect expired idle connections and close them. These two queues will use read-write lock to ensure the correctness of multi-thread execution when adding or deleting elements.
- When Session requests a connection to the connection pool, it will check whether there are usable connections in the idle connection queue. If there are any usable connections, they will be directly returned to the Session for users to use. If there are no usable connections and the current total number of connections does not exceed the maximum number of connections defined in the configuration, a new connection is created to the Session. If it reaches the maximum number of connections, an error is returned.
- Generally, the connection pool needs to be closed only when you close the program. All connections in the pool will be disconnected when the program is closed.
- Initialize:
- Session
- Session is generated through the connection pool. The user needs to provide the password for authentication. After the authentication succeeds, the user will get a Session example and communicate with the server through the connection in the Session. The most commonly used interface is
execute()
. If an error occurs during execution, the client will check the error type. If it is a network error, it will automatically reconnect and try to execute the statement again. - Note that a Session does not support being used by multiple threads at the same time. The correct way is that multiple sessions are applied by multiple threads, and one session is used by each thread.
- When the Session is released, the connection held by it will be put back into the idle connection queue of the connection pool so that it can be reused by other sessions later.
- Session is generated through the connection pool. The user needs to provide the password for authentication. After the authentication succeeds, the user will get a Session example and communicate with the server through the connection in the Session. The most commonly used interface is
- Connection
- Each connection example is equivalent and can be held by any Session. The purpose of this design is to allow these connections to be reused by different Sessions, reducing repeatedly enabling and disabling Transport.
- The connection will send the client's request to the server and return the result to the Session.
- Example
// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
// Close all connections in the pool when program exits
defer pool.Close()
// Create session
session, err := pool.GetSession(username, password)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
username, password, err.Error()))
}
// Release session and return connection back to connection pool when program exits
defer session.Release()
// Excute a query
resultSet, err := session.Execute(query)
if err != nil {
fmt.Print(err.Error())
}
Returned data structure
The client packages the returned query results by part of the complex servers and adds an interface for convenience use.
Basic type of returned results | Packaged type |
---|---|
Null | |
Bool | |
Int64 | |
Double | |
String | |
Time | TimeWrapper |
Date | |
DateTime | DateTimeWrapper |
List | |
Set | |
Map | |
Vertex | Node |
Edge | Relationship |
Path | PathWrraper |
DateSet | ResultSet |
- | Record (For row operations of ResultSet) |
nebula::Value
will be packaged as ValueWrapper
in the client and converted to other structures through interfaces. (i.g. node = ValueWrapper.asNode()
)
Analysis of data structure
For MATCH p= (v:player{name:"Tim Duncan"})-[]->(v2) RETURN p
, the returned result is:
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| p |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| <("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"})<-[:teammate@0 {end_year: 2016, start_year: 2002}]-("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 11550/12009 us)
We can see that the returned result contains one row, and its type is a path. At this time, you can execute as follows to get the properties of the destination vertex of the path (v2).
// Excute a query
resultSet, _ := session.Execute("MATCH p= (v:player{name:"\"Tim Duncan"\"})-[]->(v2) RETURN p")
// Get the first row of the result. The index of the first row is 0
record, err := resultSet.GetRowValuesByIndex(0)
if err != nil {
t.Fatalf(err.Error())
}
// Take the value of the cell in the first column from the first row
// At this time, the type of valInCol0 is ValueWrapper
valInCol0, err := record.GetValueByIndex(0)
// Convert ValueWrapper into PathWrapper objects.
pathWrap, err = valInCol0.AsPath()
// Get the destination vertex through pathWrap.GetEndNode()
node, err = pathWrap.GetEndNode()
// Get all properties through node.Properties()
// The type of props is map[string]*ValueWrapper
props, err = node.Properties()
Address of clients
The GitHub addresses of clients are as follows:
- https://github.com/vesoft-inc/nebula-cpp
- https://github.com/vesoft-inc/nebula-java
- https://github.com/vesoft-inc/nebula-python
- https://github.com/vesoft-inc/nebula-go
- https://github.com/vesoft-inc/nebula-rust
If you encounter any problems in the process of using NebulaGraph, please refer to NebulaGraph Database Manual to troubleshoot the problem. It records in detail the knowledge points and specific usage of the graph database and the graph database NebulaGraph.
Join our Slack channel if you want to discuss with the rest of the NebulaGraph community!