The CQ9 Beat

介绍SmallGrid
Written
Topic
Published

Sep 15, 2021

Introduction

CQ9ers process real-time price and volume data for over a million symbols across global markets every day. 如果我们想让人类有机会跟上, 我们需要用表格格式有效地表示这些数据! SmallGrid是一个允许运行进程发布实时表格数据的平台. SmallGrid数据更新可以更改表中的一行,也可以更改单个单元格. SmallGrid’s system design allows automatic deduction of the affected downstream aggregations that depend on the updated data, 并且是提供动态市场数据视图的支柱, 以及各种系统指标. During his time at CQ9, Summer Core Intern Will Gulian worked on implementing the initial SmallGrid backend that listens for and distributes data updates. In the future, 我们计划实现优化的聚合更新,以释放SmallGrid的扩展潜力.

张恒初,算法工程师

SmallGrid

For anyone exploring data, the interactivity provided by a Jupyter notebook and pandas is incredible. 你可以看到任何你想要的数据,因为它的工作方式通过笔记本. Unfortunately, we lose the ability to introspect all the data when productionizing a notebook. There’s no interactivity so the best you can do is log some of the current state and check the logs later, but markets are dynamic, they change constantly. 过去行之有效的策略可能会突然失效. There could be a bug, 也许其他人也发现了同样的策略, 或者,也许永远动荡的市场现在的表现不同了. In any case, 我们真的很想看看什么是策略“思考”生活和干净的方式, with minimal overhead.

为实时数据提供更好的可见性, my project for the 2nd half of the internship at CQ9 was to work on a data publishing API called SmallGrid. SmallGrid有几个可移动的部分,因为数据需要有一个简单的发布API, but the API could be called from reasonably hot code so it needs to be efficient if necessary as well. 为了最小化使用SmallGrid需要执行的工作代码, 还有一个SmallGrid守护进程处理实际发布表数据. We use Redis 我们正在广泛调查 Apache Kafka 对于某些用例,所以SmallGrid支持同时向两者发布 Redis Streams and Kafka topics.

Writing Data

Algo developers at CQ9 use Python and C++ so the SmallGrid API needs to be convenient to use in either language. The API I settled on has global tables accessible by name and cells can be written to by row and column:

auto t = SmallGrid::table("我的表名");

t.发布("SPY", "MidPx", 440.70);
t.出版(“AAPL”,“MidPx”,147.06);

双精度是最常用的类型,但也有其他类型, 包括字符串和布尔值, are also available. 在内部,使用 std::variant 哪个使用相同的内存存储不同的类型, like a union, 但是通过包含一个标记来确定当前存储的类型,从而提供了一个安全的包装器.

t.publish(“SPY”,“Exchange”,“Arca”);
t.publish(“AAPL”,“Fruit”,true);

使用变量的一个令人惊讶的问题是 std::visit() is 比你想象的要慢. It’s not that 速度很慢,但我仍然在使用它,但在一些微基准测试中, std::visit isn’t magically faster than dynamic dispatch and can actually be slower than using a virtual function.

While std::visit 不是很担心,使用 std::unordered_map could be. 按名称查找行和列是非常常见和使用的 unordered_map, a string 需要在每次解析名称时构造吗. 大多数字符串应该足够小,以便从小字符串优化中受益, 因此不需要堆分配, 但是最好从一开始就避免构造字符串. 幸运的是,abseil提供了更快的地图实现 absl::flat_hash_map that also supports heterogenous lookup so finding a const char * or std::string_view 不需要构造临时的 std::string.

Even with abseil, we still need to hash the strings and it would be nice to avoid that if possible. For this case, row and column names can be converted to a handle to avoid repeatedly rehashing the names using row/col() for single names and rows/cols() for many names. 我们还可以用 cell() 这只需要索引到一个向量来更新单元格值.

auto goog = t.row("GOOGL");
// or many rows...
auto [aapl, msft] = t.rows("AAPL", "MSFT");

auto midPx = t.col("MidPx");

t.发布(goog, midPx, 2711).91);

auto msftPx = t.cell(msft, midPx);

//或者:msftPx.publish(288.33);
msftPx = 288.33;

c++ 17的结构化绑定使得创建许多句柄非常方便 auto [aapl, msft] = t.rows("AAPL", "MSFT");,并且使用正确的参数包咒语执行起来非常简单.

template  array, sizeof...(Ts)> rows(Ts... ts) {
    return {row(ts)...};
}

You might notice that row() and therefore rows() returns AxisHandle. 行句柄和列句柄是不同的类型 AxisHandle / AxisHandle which makes accidentally using a row handle as a column or vice versa fail to compile with a nice error message (for C++ standards). 错误的用法用 SmallGrid::AxisRow and SmallGrid::AxisCol 在最后的消息在明亮的ANSI绿色感谢 changes in GCC 8.

src /数据/测试/ SmallGrid_test.抄送:229:29:从这里开始
src/data/SmallGrid.h:915:51: error: no matching function for call to ‘resolveIndex(SmallGrid::StringIndex&, SmallGrid::AxisHandle&)’
  915 |     auto colIdx = resolveIndex(colIndex, col);
      |                   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~
...
src/data/SmallGrid.h:897:15:注:candidate: ' size_t SmallGrid::resolveIndex(SmallGrid::StringIndex)&, SmallGrid::AxisHandle) [with Type = SmallGrid::AxisCol; size_t = long unsigned int]’
  897 | inline size_t resolveIndex(StringIndex& ind, AxisHandle handle) {
      |               ^~~~~~~~~~~~
src/data/SmallGrid.h:897:63: note:   no known conversion for argument 2 from ‘AxisHandle’ to ‘AxisHandle’
  897 | inline size_t resolveIndex(StringIndex& ind, AxisHandle handle) {
      |                                              ~~~~~~~~~~~~~~~~~^~~~~~

Python中的API看起来非常相似, 添加了一些特性,使在Python中使用SmallGrid更加方便.

# pyatl = ' Python自动交易库'
它包含从我们的c++交易库导出的Python绑定
从pyatl导入SmallGrid  

t = SmallGrid.table("my table name")

t.发布("SPY", "MidPx", 440.70)
t.出版(“AAPL”,“MidPx”,147.06)

# handles

goog = t.row("GOOGL")
aapl, msft = t.rows("AAPL", "MSFT")

midPx = t.col("MidPx")

t.发布(goog, midPx, 2711).91)

msftPx = t.cell(msft, midPx)

msftPx.publish(288.33)

Python表有一个函数 toDataFrame() 它创建并返回表数据的Pandas数据框架. While probably not useful in a production environment, this makes writing tests much more convenient.

print(t.toDataFrame())
#  =>          MidPx
#  => SPY     440.70
#  => AAPL    147.06
#  => GOOGL  2711.91
#  => MSFT    288.33

与熊猫互动非常容易 pybind11, CQ9’s python <-> C++ binding library of choice. It has special support for numpy arrays if I were worried about performance, but I just want to call pd.read_json(字符串,东方= '分裂') 使用pybind很容易做到这一点.

pyTable.def(
    "toDataFrame",
    [] (const SmallGrid:表& table) {
        auto tableJson = table.dataToJson();

        Auto pd = py::module_::import("pandas");
        auto readJson = pd.attr("read_json");
        return readJson(tableJson, "orient"_a = "split");
    }, "docstring...");

还有批量发布功能 publishRow() and publishCol() 使用关键字参数一次更新有关特定符号的多个统计信息. 假设我们想要更新Microsoft的价格和数量,我们可以在一行中完成.

t.publishRow(“微软”,MidPx = 288.33, Volume=1.3e7)

在这里使用一个变体使得pybind自动生成的文档更加清晰, Pybind将用类型注释参数 Union[str, RowHandle]. Handling function kwargs is made easy by the fact that we just need to iterate over each kwarg entry although comically structured bindings and lambda captures don’t mix (example and discussion) so the entry 这里不能绑定Pair.

using RowVar = variant>;

pyTable.def(
    "publishRow",
    [](SG::Table& 表,RowVar RowVar, py::kwargs kwargs) {
        for (auto& entry : kwargs) {
            visit([&](auto& row) {
                table.publish(row, py::cast(entry.first), 
                    py::cast(entry.second));
            }, rowVar);
        }
    },
    "docstring...");

not-Writing Data

到目前为止,所有的例子都是CQ9SmallGrid的发布API的, 然而,最好的监控解决方案是您不必考虑的解决方案. That’s why, 以及对单元格的推送式写入, SmallGrid还可以自动监视使用事件循环系统的代码的变量. It might seem weird to write code in an event loop style, but it makes all kinds of workloads faster. For IO-bound work, the event loop can use epoll() to sleep just until any IO operation has completed at which point new work can be performed.

因为CQ9的很多代码都是在epoll事件循环上操作的, it makes adopting SmallGrid monitors easy with existing code because it doesn’t require any changes to get working other than setting up the monitor itself and stashing the handle. 手柄是保持特定手表活动的东西. 如果有人忘记存储句柄,而是使用c++ 17,这可能会令人困惑 [[nodiscard]] attribute combined with -Werror,忘记句柄会触发编译错误.

//订阅eventLoop,每50毫秒一次,并将收集到的变量写入“myTable”.
auto m = SmallGrid::Monitor::get(eventLoop, "myTable", 50ms);

int64_t counter = 0;

//被监视变量的句柄. 当句柄还活着时,引用不能无效.

auto _h = m.Monitor(“row1”,“col1”,counter /* reference */);

//计数器每秒增加10倍... 表格单元格将自动更新.
eventLoop.intervalCB(100ms, [&counter]() {
    counter++;
});

//运行事件循环,触发回调.
eventLoop.run();

还支持回调, in fact, 监视引用在内部隐式地转换为回调. 我们有自己的回调类型(类似于建议的) std::function_ref 类型)的开销比 std::function 但这对SmallGrid的用户来说是透明的,他们可以提供lambda或回调.

vector myVector;

// bind to a lambda
m.Monitor ("row1", "col1", [&]{ return myVector.size(); });

//绑定到对象的成员函数.
m.Monitor ("row1", "col2", member_cb(&myVector, &vector::size));

SmallGrid daemon

将数据写入内存中的一些表是不错的, 然后我们需要从交易过程中获取表格数据. For most use cases, 写入文件或套接字是显而易见的选择,但是, SmallGrid可以在热路径代码中使用, so we want to avoid the overhead of any syscalls by moving the communication with our database, Kafka or Redis, into a daemon process.

但是首先,消息需要到达守护进程. 我们可以通过unix套接字与守护进程通信. Instead, we do something even cooler: we create a shared memory ring buffer (which I will call a shmqueue). This is what the kernel would do behind the scenes anyway when you open a unix socket between two processes but by creating the shmqueue ourselves, 我们可以用写入内存的开销来读写shmqueue.

有几种方法可以创建两个进程之间共享的内存区域. Ironically, 为了设置shmqueue,我们确实使用了Unix套接字, but the socket is not used once the shmqueue is setup (the socket closing is useful to indicate the other side has died). You could use the socket to communicate a shm file path, but a fancier solution involves using sendmsg() and recvmsg() and ancillary data 将文件描述符发送到另一个进程. 通过发送文件描述符, the processes don’t need to worry about file permissioning since the daemon process doesn’t need to open() the shm file itself.

配置完shmqueue之后, a SmallGrid client process just needs to write a cell update message (a simple struct) and can then get back to more important things. On the other side, the daemon process picks up any cell updates from each of the shmqueues and writes the updates to the Kafka topic or Redis stream.

一台机器上通常有许多不同的进程, from trading processes to risk monitoring tools that may want to write to SmallGrid so centralizing this into a daemon cuts down on the required number of connections to Kafka or Redis (both are deployed in clusters) from an O( * ) 连接数为 O( + ) 守护进程处理的连接.

Publishing Updates

一旦单元更新到达守护进程,它们就需要到达数据库. 我们有一个很好的Redis客户端,但是CQ9内部不太使用Apache Kafka. I ended up using modern-cpp-kafka (它是标准的包装 librdkafka),这种方法基本上奏效了,但也出现了一些意外. modern-cpp-kafka has a KafkaSyncProducer 哪个将同步写入消息. 这是开箱即用的,但没有任何地方接近我正在寻找的吞吐量. modern-cpp-kafka also has a KafkaAsyncProducer 异步生成器有一个听起来很简洁的参数 EventsPollingOption. If EventsPollingOption:手动 is provided, a function .pollEvents() 需要定期调用,以处理任何已完成(或失败)消息的回调 send() calls. Unfortunately, pollEvents() does not behave nicely with an event loop style because it does not have an execution timeout (there is a timeout parameter but the timeout determines how long to wait for new messages). 我正在处理非常大的消息批处理大小, 当200万条信息同时发送时, .pollEvents() 会先处决然后销毁200万吗 std::functionS,这个很容易占用一秒钟. 由于事件循环上的其他工作也需要获得执行时间,因此这是不可接受的.

为了解决这个问题,我没有使用 EventsPollingOption:手动 which means my callbacks would be called automatically (from a different thread) at which point the results would be written to my own queue, and the event loop can process the completed messages in chunks while letting any other tasks also get execution time.

Conclusion & Future Work

Will Gulian’s implementation provides a solid foundation for distributing tabular data updates. We are in the process of an initial rollout of tabular dashboards powered by SmallGrid – it’s very exciting! 下一步,我们计划实现增量聚合. 增量聚合批量表格数据更新, 并且仅将更新应用于依赖于更新单元格的下游聚合值. 这个特性将增加整个表格的更新, 并且可以避免重新计算不受更新影响的聚合值.

张恒初,算法工程师

Don't Miss a Beat

关注我们,了解CQ9在工程、数学和自动化方面的最新信息.