@9.23 GOSIM Workshop 上海 | 可能是史上最强 Rust 机器人框架 Dora
“今年(2023)九月份将在上海举办首届的 GOSIM Workshop (9.23-24)[1] 和 GOSIM Conference (9.26)[2] 盛会。GOSIM 会场将支持「同声传译」,小伙伴们可以放心购票参与。还不了解 GOSIM 大会的朋友,可以参考文章结尾的详细介绍。
本次 GOSIM Workshop 机器人与自动驾驶分论坛请来了 Rust 开源下一代机器人框架的项目核心维护者 Xavier Tao 和 Philipp 来给大家做精彩分享。在参加 Workshop 之前,你应该对 Dora-rs 有一定了解。在本文中,你也将学到 Rust 如何安全高效和 Python 交互。
点击观看:机器人与自动驾驶的开源之旅 | GOSIM 数字纪事 ,了解关于汽车与自动驾驶,以及 Dora 的更多故事。
Dora 介绍
Dora-rs[3] 是一个全新的现代化机器人框架,它基于 Rust 实现,具有极高的性能。dora-rs 也可以用于深度学习应用,旨在方便地集成最先进的深度学习模型,并希望能够在框架内进行训练。
2022年DORA-RS开源项目被发布,它全称为Dataflow Oriented Robotics Architecture。它借鉴了ROS,CyberRT和ERDOS这些项目的优点,试图解决机器人和自动驾驶领域长久以来的一大难题:如何能把繁荣的基于Python的AI算法生态集成到机器人和自动驾驶的软件开发中,同时能够高效地处理数据流。
它使用Rust语言做数据流的传输和调度管理,大大减少了数据的重复拷贝和传输。它提供了Rust语言和Python语言之间的无缝集成,减少了跨语言的性能代价。借助Python丰富的算法模块,开发者可以通过YAML蓝图脚本轻松设计出适用于各类机器人和自动驾驶场景的数据流。DORA为您提供与,实现数据流的零拷贝和高效IPC传输,极大提高性能。使得开发者可以专注于应用开发,而无需过多担心性能问题。
同时与Carla自动驾驶仿真系统完美结合,开发者可以使用DORA-RS提供的基线算法,开发先进的自动驾驶应用,并进行仿真测试。不仅仅是仿真,DORA-RS同样支持真实的自动驾驶与控制器系统,无需更改代码,即可在仿真和现实环境中验证您的算法。
它的性能是ROS2 Python API 的 17 倍!是 ROS2 Rust API 的 10 倍!与 ROS2 C/Cpp API 共享内存快 0.06 ms。
Dora 的特点是:
零成本开销:在共享内存上进行零拷贝传输消息!使用Arrow和自己的共享内存守护程序,在单台机器上实现光速通信。 可扩展:专为机器和机器人扩展而设计!使用YAML描述来使软件具有声明性,以便在多台机器上进行分发。 快速原型制作:可复用YAML数据流中的现有节点和运算符,这样用户就不需要费心复制粘贴样板代码。用户还可以使用Python进行实时调试!支持热重载。 可观测:提供命令行界面和OpenTelemetry获取日志、跟踪和指标! 跨平台支持:Dora在大多数平台和架构上都提供Python、Rust、C和C++版本! 社区驱动:Dora 团队希望打造成一个由社区驱动的项目,并帮助其他人了解机器人技术。
dora 框架被划分为不同的组件:
节点(Node):Dora节点是通过Dora库与其他节点进行通信的独立进程。节点可以是自定义的、用户指定的程序,也可以是Dora运行时节点,允许运行Dora操作符。节点实现了自己的 main
函数,因此对其执行具有完全控制权。算子(Operators:):算子是轻量级、协作的、基于库的组件,由dora运行时节点执行。它们必须根据所使用的语言实现特定的接口。算子可以使用dora运行时提供的各种高级功能,例如优先级调度或本地截止日期支持。 协调器(Coordinator):协调器负责从YAML文件中读取数据流,验证数据流,并将节点和运算符部署到指定或自动确定的机器上。它监控运算符的健康状况,并实现高级集群管理功能。例如,我们可以为云节点实现自动扩展或运算符的复制和重启。协调器可以通过命令行程序(CLI)进行控制。 通信层采用可插拔组件设计,目前支持 zenoh[4]。
Dora 与 多语言接口
Dora 目前提供 Rust/Cpp/Python Api,但编写一个多语言库并不容易。dora 的核心贡献者 Xavier 近日分享了他在构建 wonnx[5] 和 dora-rs 过程中遇到的 FFI 相关的问题和解决方案,将以Rust-Python FFI通过 pyo3
为例进行说明,demo源码见 blogpost_ffi[6]。该主题也会在 GOSIM Workshop 上面讲。
“wonnx 是纯 Rust 实现的 GPU 加速的 ONNX 推理运行时,适用于Web.
`pyo3`[7] 是最常用的Rust-Python绑定之一,可以为开发者创建FFI。我们所要做的就是用 #[pyfunction]
包装我们的函数,这样它就可以在Python中使用了。
use pyo3::prelude::*;
/// 两个数字之和必须被转为字符串
/// `#[pyfunction]` 指定这是 Python 中用的函数
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
Ok((a + b).to_string())
}
/// `#[pymodule]` 指定用 Rust 实现的 Python 函数
/// 该函数名应该与 `Cargo.toml`的 `lib.name` 名字设定相同
#[pymodule]
fn blogpost_ffi(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
Ok(())
}
然后在 Python 中就可以这样调用:
maturin develop
python -c "import blogpost_ffi; print(blogpost_ffi.sum_as_string(1,1))"
# Return: "2"
这里有个问题就是,自动解释的类型不是最优化实现。
实现1: Default
假如,我们想要使用数组进行操作,我们希望在Rust和Python之间接收一个数组输入并返回一个数组输出。默认的实现如下所示:
#[pyfunction]
fn create_list(a: Vec<&PyAny>) -> PyResult<Vec<&PyAny>> {
Ok(a)
}
#[pymodule]
fn blogpost_ffi(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
m.add_function(wrap_pyfunction!(create_list, m)?)?;
Ok(())
}
“对于
create_list
创建value = [1] * 100_000_000
这样的非常大的列表进行调用,返回时间将为2.27秒。参考 test_script.py[8]
这个速度有点慢...原因是这个列表将会在循环中逐个解释元素。我们可以尝试同时使用所有元素来提高效率。
实现2: PyBytes
假设我们的数组是一个可以表示为 PyBytes
的C连续数组。代码可以通过将输入和输出转换为 PyBytes
来进行优化:
#[pyfunction]
fn create_list_bytes<'a>(py: Python<'a>, a: &'a PyBytes) -> PyResult<&'a PyBytes> {
let s = a.as_bytes();
let output = PyBytes::new_with(py, s.len(), |bytes| {
bytes.copy_from_slice(s);
Ok(())
})?;
Ok(output)
}
“对于相同的列表输入,
create_list_bytes
返回结果需要78毫秒。这是比之前快30倍 🐎
加速是因为可以复制内存范围而不是遍历每个元素,并且可以在不复制的情况下进行读取。
现在的问题是:
PyBytes
只在Python中可用,这意味着如果我们计划使用其他语言,我们将不得不为每种语言复制这个。PyBytes
可能还需要重新转换为其他有用的类型。需要创建一个副本。
解决这个问题的方法是使用 Apache Arrow[9]。
实现3: Arrow
Apache Arrow是一种通用的内存格式,可在许多语言中使用。使用 Arrow ,跟上面相同功能的代码是这样:
#[pyfunction]
fn create_list_arrow(py: Python, a: &PyAny) -> PyResult<Py<PyAny>> {
let arraydata = arrow::array::ArrayData::from_pyarrow(a).unwrap();
let buffer = arraydata.buffers()[0].as_slice();
let len = buffer.len();
// Zero Copy Buffer reference counted
let arc_s = Arc::new(buffer.to_vec());
let ptr = NonNull::new(arc_s.as_ptr() as *mut _).unwrap();
let raw_buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, arc_s) };
let output = arrow::array::ArrayData::try_new(
arrow::datatypes::DataType::UInt8,
len,
None,
0,
vec![raw_buffer],
vec![],
)
.unwrap();
output.to_pyarrow(py)
}
“相同的列表在33毫秒内返回。这比🐎🐎好2倍。
这是由于在返回结果时进行了零拷贝。零拷贝是安全的,因为我们正在对数组进行引用计数。只有当所有引用都被移除后,数组才会被释放。
arrow
的好处是:
为了实现零拷贝,在处理更大的数据时能够更好地扩展。 可以在其他语言中重复使用。我们只需要将函数的最后一行替换为导出到其他语言的代码。 有许多类型的描述,包括 List
,Mapping
和Struct
。可以直接在 numpy
、pandas
和pytorch
中进行零拷贝转换。
调试
处理高效的接口并不是连接多种语言的唯一挑战。我们还必须处理跨语言调试。
我们目前的实现使用 .unwrap()
。然而,如果出现错误,这将导致整个Python进程崩溃。
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: PyErr { type: <class 'TypeError'>, value: TypeError('Expected instance of pyarrow.lib.Array, got builtins.int'), traceback: None }', src/lib.rs:45:62
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/home/peter/Documents/work/blogpost_ffi/test_script.py", line 79, in <module>
array = blogpost_ffi.create_list_arrow(1)
pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: PyErr { type: <class 'TypeError'>, value: TypeError('Expected instance of pyarrow.lib.Array, got builtins.int'), traceback: None }
解决方案:eyre[10] 。
Eyre是一个用于Rust应用程序的简单惯用错误处理库。我们可以通过在我们的 pyo3
项目中添加 pyo3/eyre
特性标志来使用eyre,将所有的 .unwrap()
替换为 .context("our context")?
。这将把不可恢复的错误转换为可恢复的Python错误,并提供有关我们错误的详细信息。
“与上述相同的错误,但使用
eyre
,这将提供一个更好看的错误消息:
Could not convert arrow data
Caused by:
TypeError: Expected instance of pyarrow.lib.Array, got builtins.int
Location:
src/lib.rs:75:50
相关 Rust 实现代码:
#[pyfunction]
fn create_list_arrow_eyre(py: Python, a: &PyAny) -> Result<Py<PyAny>> {
let arraydata =
arrow::array::ArrayData::from_pyarrow(a).context("Could not convert arrow data")?;
let buffer = arraydata.buffers()[0].as_slice();
let len = buffer.len();
// Zero Copy Buffer reference counted
let arc_s = Arc::new(buffer.to_vec());
let ptr = NonNull::new(arc_s.as_ptr() as *mut _).context("Could not create pointer")?;
let raw_buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, arc_s) };
let output = arrow::array::ArrayData::try_new(
arrow::datatypes::DataType::UInt8,
len,
None,
0,
vec![raw_buffer],
vec![],
)
.context("could not create arrow arraydata")?;
output
.to_pyarrow(py)
.context("Could not convert to pyarrow")
}
我要提一下,当从Rust代码调用Python代码时,你可能会丢失Python的回溯错误。
我建议使用以下自定义的回溯方法来获得一个描述性的错误:
#[pyfunction]
fn call_func_eyre(py: Python, func: Py<PyAny>) -> Result<()> {
let _call_python = func.call0(py).context("function called failed")?;
Ok(())
}
fn traceback(err: pyo3::PyErr) -> eyre::Report {
let traceback = Python::with_gil(|py| err.traceback(py).and_then(|t| t.format().ok()));
if let Some(traceback) = traceback {
eyre::eyre!("{traceback}\n{err}")
} else {
eyre::eyre!("{err}")
}
}
#[pyfunction]
fn call_func_eyre_traceback(py: Python, func: Py<PyAny>) -> Result<()> {
let _call_python = func
.call0(py)
.map_err(traceback) // this will gives python traceback.
.context("function called failed")?;
Ok(())
}
“没有自定义回溯的示例错误:
---Eyre no traceback---
eyre no traceback says: function called failed
Caused by:
AssertionError: I have no idea what is wrong
Location:
src/lib.rs:89:39
------
“使用自定义的回溯信息来改进错误提示:
---Eyre traceback---
eyre traceback says: function called failed
Caused by:
Traceback (most recent call last):
File "/home/peter/Documents/work/blogpost_ffi/test_script.py", line 96, in abc
assert False, "I have no idea what is wrong"
AssertionError: I have no idea what is wrong
Location:
src/lib.rs:96:9
------
通过追踪,我们可以快速确定错误源。
内存管理
假设我们需要在循环中创建数组:
/// 内存无限增长
#[pyfunction]
fn unbounded_memory_growth(py: Python) -> Result<()> {
for _ in 0..10 {
let a: Vec<u8> = vec![0; 40_000_000];
let _ = PyBytes::new(py, &a);`
std::thread::sleep(Duration::from_secs(1));
}
Ok(())
“调用此函数将消耗440MB的内存。
发生的情况是, pyo3
内存模型会将所有的Python变量保存在内存中,直到GIL被释放。因此,如果我们在循环中创建变量,所有临时变量将会被保留,直到GIL被释放。
这是由于默认情况下 pyfunction
锁定了GIL。
通过理解基于GIL的内存模型,我们可以使用作用域GIL来实现预期的行为:
#[pyfunction]
fn bounded_memory_growth(py: Python) -> Result<()> {
py.allow_threads(|| {
for _ in 0..10 {
Python::with_gil(|py| {
let a: Vec<u8> = vec![0; 40_000_000];
let _bytes = PyBytes::new(py, &a);
std::thread::sleep(Duration::from_secs(1));
});
}
});
// or
for _ in 0..10 {
let pool = unsafe { py.new_pool() };
let py = pool.python();
let a: Vec<u8> = vec![0; 40_000_000];
let _bytes = PyBytes::new(py, &a);
std::thread::sleep(Duration::from_secs(1));
}
Ok(())
}
“调用此函数将消耗80MB的内存。
更多信息可以在这里找到[11],Pyo3 0.21中可能的修复![12]。
竞态条件
假设我们需要在不同的线程中处理数据:
/// Function GIL Lock
#[pyfunction]
fn gil_lock() {
let start_time = Instant::now();
std::thread::spawn(move || {
Python::with_gil(|py| println!("This threaded print was printed after {:#?}", &start_time.elapsed()));
});
std::thread::sleep(Duration::from_secs(10));
}
“线程是在10.0秒后打印的。
在使用Python与 pyo3
时,我们必须确保准确知道GIL何时被锁定或解锁,以避免竞态条件(Race condition)。
在上面的例子中,问题在于默认情况下 pyo3
会在主函数线程中锁定GIL,从而阻塞等待GIL的派生线程。
如果我们在主函数线程中使用GIL或者释放主函数线程中的GIL,都没有问题。
/// No gil lock
#[pyfunction]
fn gil_unlock() {
let start_time = Instant::now();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(10));
});
Python::with_gil(|py| println!("1. This was printed after {:#?}", &start_time.elapsed()));
// or
let start_time = Instant::now();
std::thread::spawn(move || {
Python::with_gil(|py| println!("2. This was printed after {:#?}", &start_time.elapsed()));
});
Python::with_gil(|py| {
py.allow_threads(|| {
std::thread::sleep(Duration::from_secs(10));
})
});
}
““1” 在32微秒后被打印出来,而“2”在80微秒后被打印出来,因此没有竞争条件。
观测
正如我们所看到的,能够测量在接口过程中所花费的时间对于识别瓶颈非常有价值。但是像我们之前那样手动测量所花费的时间可能会很繁琐。
我们可以使用一个可观测库来为我们完成这个任务。Opentelemetry[13] 可以帮助我们构建一个分布式可观测系统,能够跨多种语言进行桥接。Opentelemetry可以用于追踪、度量和日志记录。
/// No gil lock
#[pyfunction]
fn global_tracing(py: Python, func: Py<PyAny>) {
// global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
global::set_text_map_propagator(TraceContextPropagator::new());
// Connect to Jaeger Opentelemetry endpoint
// Start a new endpoint with:
// docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest
let _tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("172.17.0.1:6831")
.with_service_name("rust_ffi")
.install_simple()
.unwrap();
let tracer = global::tracer("test");
// Parent Trace, first trace
let _ = tracer.in_span("parent_python_work", |cx| -> Result<()> {
std::thread::sleep(Duration::from_secs(1));
let mut map = HashMap::new();
global::get_text_map_propagator(|propagator| propagator.inject_context(&cx, &mut map));
let output = func
.call1(py, (map,))
.map_err(traceback)
.context("function called failed")?;
let out_map: HashMap<String, String> = output.extract(py).unwrap();
let out_context = global::get_text_map_propagator(|prop| prop.extract(&out_map));
std::thread::sleep(Duration::from_secs(1));
let _span = tracer.start_with_context("after_python_work", &out_context); // third trace
Ok(())
});
}
然后,在Python代码中:
def abc(cx):
propagator = TraceContextTextMapPropagator()
context = propagator.extract(carrier=cx)
with tracing.tracer.start_as_current_span(
name="Python_span", context=context
) as child_span:
child_span.add_event("in Python!")
output = {}
tracing.propagator.inject(output)
time.sleep(2)
return output
我们将获得以下的追踪记录:
使用这个工具,我们可以测量在语言之间进行接口时所花费的时间,识别锁问题,并通过日志和指标的结合,降低多语言库的复杂性。
小结
所有上述优化已经在 dora-rs 中实现,它让您可以使用Rust、Python、C和C++构建快速简单的数据流。Dora 最近刚刚开设了一个Discord,您可以在那里随时提出任何问题:https://discord.gg/ucY3AMeu[14]
汽车与机器人 Rust Workshop 日程介绍
9.23 Dora 和 Theseus OS
《ROS2与DORA之间的类型消息桥梁》,由 PHILIPP 教授分享,他是 Rust 社区知名博客《用 Rust 写操作系统》的作者。Dora[15] 是他参与开发的一个全新机器人框架,也是 Rust 实现的。Dora-ROS2 桥接器可以创建能够发送和接收 ROS2 类型信息的 Dora 节点。该桥接器不依赖于 ROS2 库或构建系统。相反,它直接通过 DDS 进行通信,并解析 ROS2 msg
文件中的类型信息。本讲座将重点介绍一些实现细节,包括 - 在编译时为 ROS2 类型自动生成 Rust 结构体 - 根据 ROS2 类型信息对消息进行动态序列化和反序列化。《DORA 中的透明零拷贝 IPC》,同样也是由 PHILIPP 教授分享。 《Rust-Python FFI&多语言系统的调试》,由 Dora 工程师 Xavier 带来的分享。介绍如何克服 Python FFI 挑战,处理 GIL、python 版本链接、数据传递和并行;以及处理多语言系统中的数据、跟踪、度量、日志和错误。 《Dora-drives:自动驾驶变得简单》,这是一个循序渐进的教程,初学者可以使用一个简单的入门套件,从零开始编写自己的自动驾驶汽车程序。Dora-drives 让自动驾驶学习更快更简单。是不是很有趣? 《Thesus:安全可靠的 Rust 本机操作系统》,Thesus 是一个完全用 Rust 从零开始编写的新型操作系统,其目标是安全可靠。由 FutureWEI 的 Kevin 带来的分享,他将介绍 Theseus 由众多微小组件组成的独特操作系统结构,并探讨我们的关键内部语言设计原则如何让 Theseus 将资源管理等典型的操作系统职责转移到编译器中。
GOSIM 介绍
GOSIM(Gobal OpenSource Innovation Meetup) 全球开源创新汇[16] 是由 WasmEdge Runtime 创始人 Michael Yuan 和 CSDN 创始人兼董事长蒋涛共同发起的开源社区平台。
在这个数字时代,障碍不断被打破,视野持续拓展,GOSIM 为所有开源爱好者照亮了前行的道路。这不仅仅是一个聚会,更是一场变革。通过促进全球合作,多样化技术生态,并分享无与伦比的知识,GOSIM 不仅正在塑造开源的现在,更是锻造其未来。对于那些坚信开源力量,认为它有潜力重新定义技术界限的人,GOSIM 正在召唤您。加入这场运动,成为开源变革的一部分。
GOSIM 由以下三大支柱组成:
GOSIM 大会(Conference):每年举办一次的这场会议是开源领域各个领域思想的汇聚之地。无论您是策略师、架构师、研究者,还是仅仅是一名开源爱好者,GOSIM 会议都为您提供了一个深入探索开源技术趋势、策略、治理和最佳实践的机会。 GOSIM 工作坊(Workshop):这是理论与实践相结合的地方。作为一个年度盛会,GOSIM 工作坊全面致力于实践行动 —— 开源项目设计、代码开发、黑客马拉松、竞赛以及深入讨论。这是专为那些希望积极塑造开源未来的项目领导者、开发者和维护者量身定制的活动。 GOSIM 开源驿站(Fellowship):除了每年的聚会,GOSIM 的核心是其全年无休的奖学金项目。这是一个持续的计划,支持开源项目的开发,得到了赞助者和资助的大力支持。
小结
在这两天的 COSIM Workshop 中,你能通过亲自参与以及亲手开发来了解 Rust 语言在这些科技领域中如何发挥着它的作用。作为 Rust 爱好者,你还犹豫什么呢?需要购票的小伙伴可以去官网[17]购票,票价低至 99 元(学生票)。
除了 Workshop 之外,也别忘记关注 GOSIM Conference (9.26)[18] 盛会,也会有 Rust 相关的精彩议题。
参考资料
GOSIM Workshop (9.23-24): http://workshop2023.gosim.org/
[2]GOSIM Conference (9.26): https://conference2023.gosim.org/
[3]Dora-rs: https://dora.carsmos.ai/
[4]zenoh: https://github.com/eclipse-zenoh/zenoh
[5]wonnx: https://github.com/webonnx/wonnx
[6]blogpost_ffi: https://github.com/haixuanTao/blogpost_ffi/tree/main
[7]pyo3
: https://github.com/PyO3/pyo3
1] * 100_000_000`这样的非常大的列表进行调用,返回时间将为2.27秒。参考 [test_script.py: https://github.com/haixuanTao/blogpost_ffi/blob/main/test_script.py
[9]Apache Arrow: https://arrow.apache.org/
[10]eyre: https://github.com/eyre-rs/eyre
[11]更多信息可以在这里找到: https://pyo3.rs/main/memory.html#gil-bound-memory
[12]Pyo3 0.21中可能的修复!: https://github.com/PyO3/pyo3/issues/3382
[13]Opentelemetry: https://opentelemetry.io/
[14]https://discord.gg/ucY3AMeu: https://discord.gg/ucY3AMeu
[15]Dora: https://github.com/dora-rs/dora
[16]GOSIM(Gobal OpenSource Innovation Meetup) 全球开源创新汇: https://www.gosim.org/
[17]官网: https://workshop2023.gosim.org/
[18]GOSIM Conference (9.26): https://conference2023.gosim.org/