Source-code
NebulaGraph Source Code Explained: Scheduler & Executor
You may have learned the optimizer of NebulaGraph's query engine in the last article. In this article, we will introduce how the Scheduler and the Executor, the last two modules of the query engine, are implemented.
Overview
In the execution phase, the execution engine uses the Scheduler to transform a physical execution plan, generated by the Planner, into a series of Executors to drive their execution. Each PlanNode in a physical execution plan has a corresponding Executor.
Structure of Source Files
The source code of the Scheduler is under the src/scheduler
directory.
src/scheduler
├── AsyncMsgNotifyBasedScheduler.cpp
├── AsyncMsgNotifyBasedScheduler.h
├── CMakeLists.txt
├── Scheduler.cpp
└── Scheduler.h
The Scheduler abstract class defines the common interfaces of the schedulers, which can inherit the features from the class to implement various types of schedulers. The AsyncMsgNotifyBasedScheduler
scheduler has been implemented. By using the asynchronous message communication and breadth-first search algorithm, it can be prevented from stack overflow errors.
The source code of the Executor is under the src/executor
directory.
src/executor
├── admin
├── algo
├── CMakeLists.txt
├── ExecutionError.h
├── Executor.cpp
├── Executor.h
├── logic
├── maintain
├── mutate
├── query
├── StorageAccessExecutor.cpp
├── StorageAccessExecutor.h
└── test
Process
First, the Scheduler starts the traversal of the entire execution plan from its root node by using the breadth-first search algorithm and builds their notification mechanism according to the dependencies between nodes. During the execution phase, each node will be scheduled to be executed after being notified that all the nodes it depends on have been executed successfully. For a node, once executed, it will notify its dependent nodes until the entire plan is executed successfully.
void AsyncMsgNotifyBasedScheduler::runExecutor(
std::vector<folly::Future<Status>>&& futures,
Executor* exe,
folly::Executor* runner,
std::vector<folly::Promise<Status>>&& promises) const {
folly::collect(futures).via(runner).thenTry(
[exe, pros = std::move(promises), this](auto&& t) mutable {
if (t.hasException()) {
return notifyError(pros, Status::Error(t.exception().what()));
}
auto status = std::move(t).value();
auto depStatus = checkStatus(std::move(status));
if (!depStatus.ok()) {
return notifyError(pros, depStatus);
}
// Execute in current thread.
std::move(execute(exe)).thenTry(
[pros = std::move(pros), this](auto&& exeTry) mutable {
if (exeTry.hasException()) {
return notifyError(pros, Status::Error(exeTry.exception().what()));
}
auto exeStatus = std::move(exeTry).value();
if (!exeStatus.ok()) {
return notifyError(pros, exeStatus);
}
return notifyOK(pros);
});
});
}
Each Executor goes through four phases: "create", "open", "execute", and then "close".
create
In the "create" phase, an appropriate Executor will be generated according to the node type.
open
In the "open" phase, before the execution starts, the Executor is initialized, the slow queries are terminated, and the memory watermark is checked. When using NebulaGraph, you can use kill
to terminate a query, so the status of the current execution plan must be checked before the execution of each Executor. If the plan is in the killed
status, the execution will be terminated. Before the execution of each query Executor, it is necessary to check whether the amount of free memory has fallen below the watermark. If the watermark is reached, the execution will be terminated, which may avoid OOM.
Status Executor::open() {
if (qctx_->isKilled()) {
VLOG(1) << "Execution is being killed. session: " << qctx()->rctx()->session()->id()
<< "ep: " << qctx()->plan()->id()
<< "query: " << qctx()->rctx()->query();
return Status::Error("Execution had been killed");
}
auto status = MemInfo::make();
NG_RETURN_IF_ERROR(status);
auto mem = std::move(status).value();
if (node_->isQueryNode() && mem->hitsHighWatermark(FLAGS_system_memory_high_watermark_ratio)) {
return Status::Error(
"Used memory(%ldKB) hits the high watermark(%lf) of total system memory(%ldKB).",
mem->usedInKB(),
FLAGS_system_memory_high_watermark_ratio,
mem->totalInKB());
}
numRows_ = 0;
execTime_ = 0;
totalDuration_.reset();
return Status::OK();
}
execute
The input and output of a query Executor are in the form of tables (DataSet). The execution of an Executor is based on the iterator model, which means that for each calculation, the next()
method of the iterator of the input table is called to retrieve a row of data and then the calculation is performed. Such a process is repeated until the traversal of the entire input table is done. The results of the calculations are constructed into a new table and output to the next Executor as its input.
folly::Future<Status> ProjectExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* project = asNode<Project>(node());
auto columns = project->columns()->columns();
auto iter = ectx_->getResult(project->inputVar()).iter();
DCHECK(!!iter);
QueryExpressionContext ctx(ectx_);
VLOG(1) << "input: " << project->inputVar();
DataSet ds;
ds.colNames = project->colNames();
ds.rows.reserve(iter->size());
for (; iter->valid(); iter->next()) {
Row row;
for (auto& col : columns) {
Value val = col->expr()->eval(ctx(iter.get()));
row.values.emplace_back(std::move(val));
}
ds.rows.emplace_back(std::move(row));
}
VLOG(1) << node()->outputVar() << ":" << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).finish());
}
If the input table of the current Executor cannot be used by the other Executors as their input, the memory occupied by the table will be dropped in the execution phase to reduce memory usage.
void Executor::drop() {
for (const auto &inputVar : node()->inputVars()) {
if (inputVar != nullptr) {
// Make sure use the variable happened-before decrement count
if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) {
// Make sure drop happened-after count decrement
CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0);
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
}
}
}
}
close
After the execution of an Executor is done, some collected execution information, such as execution time and the number of rows in the output table, is added to the profiling statistics. You can run a PROFILE
statement and then view the statistics in the returned result.
Execution Plan (optimize time 141 us)
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| id | name | dependencies | profiling data | operator info |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 2 | Project | 3 | ver: 0, rows: 56, execTime: 147us, totalTime: 160us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [ |
| | | | | "VertexID", |
| | | | | "player.age" |
| | | | | ], |
| | | | | "name": "__Project_2", |
| | | | | "type": "DATASET" |
| | | | | } |
| | | | | ] |
| | | | | inputVar: __TagIndexFullScan_1 |
| | | | | columns: [ |
| | | | | "$-.VertexID AS VertexID", |
| | | | | "player.age" |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 3 | TagIndexFullScan | 0 | ver: 0, rows: 56, execTime: 0us, totalTime: 6863us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [ |
| | | | | "VertexID", |
| | | | | "player.age" |
| | | | | ], |
| | | | | "name": "__TagIndexFullScan_1", |
| | | | | "type": "DATASET" |
| | | | | } |
| | | | | ] |
| | | | | inputVar: |
| | | | | space: 318 |
| | | | | dedup: false |
| | | | | limit: 9223372036854775807 |
| | | | | filter: |
| | | | | orderBy: [] |
| | | | | schemaId: 319 |
| | | | | isEdge: false |
| | | | | returnCols: [ |
| | | | | "_vid", |
| | | | | "age" |
| | | | | ] |
| | | | | indexCtx: [ |
| | | | | { |
| | | | | "columnHints": [], |
| | | | | "index_id": 325, |
| | | | | "filter": "" |
| | | | | } |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
| 0 | Start | | ver: 0, rows: 0, execTime: 1us, totalTime: 19us | outputVar: [ |
| | | | | { |
| | | | | "colNames": [], |
| | | | | "type": "DATASET", |
| | | | | "name": "__Start_0" |
| | | | | } |
| | | | | ] |
-----+------------------+--------------+-----------------------------------------------------+--------------------------------------
So far, the explanation of the query engine source code has been completed. Next time we will explain the implementation of some features of NebulaGraph.
If you encounter any problems in the process of using NebulaGraph, please refer to NebulaGraph Database Manual to troubleshoot the problem. It records in detail the knowledge points and specific usage of the graph database and the graph database NebulaGraph.
Join our Slack channel if you want to discuss with the rest of the NebulaGraph community!