万字好文:从无栈协程到C++异步框架!
点个关注👆跟腾讯工程师学技术
导语 | 本文我们将尝试对整个 C++的协程做深入浅出的剖析,方便大家的理解。再结合上层的封装,最终给出一个 C++异步框架实际业务使用的一种形态,方便大家更好的在实际项目中应用无栈协程。
在开始展开协程前,我们先来看一下一些非 C++语言中的协程实现。
(一)其他语言中的协程实现
很多语言里面,协程是作为 "一类公民" 直接加入到语言特性中的, 比如:
Dart1.9示例代码
Future<int> getPage(t) async {
var c = new http.Client();
try {
var r = await c.get('http://xxx');
print(r);
return r.length();
} finally {
await c.close();
}
}
Python示例代码
async def abinary(n):
if n <= 0:
return 1
l = await abinary(n-1)
r = await abinary(n-1)
return l + 1 + r
C#示例代码
aysnc Task<string> WaitAsync()
{
await Task.Delay(10000);
return "Finished";
}
小结
众多语言都实现了自己的协程机制, 通过上面的例子, 我们也能看到, 相关的机制使函数的执行特殊化了, 变成了可以多次中断和重入的结构. 那么如果 C++要支持这种机制, 会是一个什么情况呢? 接下来我们将先从最基本的原理逐步展开相关的探讨。
协作式多任务操作系统
抢占式多任务操作系统
聊到中断, 其中比较重要的就是执行环境的保存和恢复了, 而上下文的保存能力可以是操作系统直接提供的, 也可以是程序机制自身所提供的了, 综上所述, 我们大致可以将 c++中的协程的实现方案的迭代看成如下情况:
最早利用 setjump 来实现的协作式任务调度器
系统级实现, 如 linux 提供的 ucontext 相关 API, Windows 提供的 Fiber 相关的 Api
由系统级实现所衍生出的高性能方案, 一般是借签系统级的实现, 移除一些非必须的操作所达成的, 代表的方案有大家熟知的libco和 boost::context, 也就是我们通常所说的有栈协程实现
无栈实现, 最开始是纯粹使用 duff device hack 出来的方案, 后续被 MS 规整, 部分特性依赖 compiler 实现, 逐步演化成现在的 c++20 coroutine 机制了。
了解了协程在 C++中的部分历史, 我们来简单了解一下协程的执行机制, 这里我们直接以 C++20 为例, 先来看一下概览图:
关于协程的执行, 我们主要关注以下这些地方:
中断点和重入点的定义
协程执行到哪了。
协程当前使用的 context 进行保存, 并将程序的执行权归还给外界. 此时我们也可以返回必要的值给外界, 方便外界更好的对协程的后续执行进行控制。
恢复到上次挂起执行的地方继续执行
恢复保存的 context
传递必要的值到协程
怎么保存和恢复当前的执行位置。
怎么保存和恢复当前协程引用到的内存(变量等) 本篇主要侧重无栈协程, 无栈协程相关的机制后续会具体展开. 对有栈协程相关机制感兴趣的可以翻阅 libco 或 boost.context 相关的内容进行了解。
(四) 小议无栈协程的出现
业务复杂度膨胀带来的爆栈问题
使用过大的栈, 又会导致协程本身的切换开销上升或者占用内存过多.
Duff Device Hack实现
C++20 的 Coroutine
(五) 小结
(一)项目的背景介绍
避免大量中间类的定义和使用。 基于逻辑过程本身用串行的方式实现相关代码即可(可参考后续切场景的例子) 更容易写出数据驱动向的实现。 还有比较关键的一点, 可以有效避免过多的异步 Callback 导致的逻辑混乱和难于跟踪调试的问题。
(二)为何从 C++17 说起
(三)实现概述
(四)执行流程概述
整体的执行流程通过上面的分析我们也能比较简单的整理出来:
宏展开形成一个跨越协程函数首尾的大的 swith case 状态机。
协程执行时构建新的 CoPromise 对象, 正确的处理输入参数, 输入参数会被存储在 CoPromise 对象的 std::tuple<>上, 并且每次重入时作为函数的入口参数以引用的方式转入函数内部
每次 Resume()时根据当前 CoPromise 记录的 state, 跳转到正确的 case label 继续往下执行.
执行到下一个挂起点返回控制权到调度器
(五) 另外一个示例代码
mScheduler.CreateTask([](int& c, LocalStruct& locals) -> logic::CoTaskForScheduler {
rco_begin();
{
locals.local_i = 1024;
auto* task = rco_self_task();
printf("step1 %d\n", locals.local_i);
}
rco_yield_next_frame();
{
c = 0;
while(c < 5) {
printf("in while loop c = %d\n", c);
rco_yield_sleep(1000);
c++;
}
rco_yield_next_frame();
}
rco_end();
}, 3, LocalStruct{});
(六)绕开栈变量限制的方法
rco_begin();
{
rco_set_value("id", 35567);
}
rco_yield_next_frame();
{
{
int64_t& val = rco_ref_value("id", int64_t);
val = 5;
}
locals.local_i = rco_to_value("id", int);
}
rco_end();
(七)一个内部项目中后台切场景的代码示例
rco_begin();
{
locals.clientReq = req;
locals.session = CServerUtil::GetSessionObj(sessionId);
// ...
SSTbusppInstanceKey emptyInstKey;
emptyInstKey.Init();
if (locals.session->GetTargetGameSvrID() != emptyInstKey) {
// ...
rco_await(locals.gameSceneService->CheckChangeScene(locals.playerId, locals.checkChangeSceneReq));
// ...
// 保存大世界信息
// ...
rco_await(locals.gameSceneService->ExitMainland(locals.playerId, locals.exitMainlandReq));
// ...
}
auto gameMgrClient = GServer->GetRpcClient(TbusppInstanceKey{TBUSPP_SERVER_GAMEMGRSVR, ""});
locals.gameMgrService = rstudio::rpc_proxy::GameMgrService_Proxy::Create(gameMgrClient, GServer->GetRpcScheduler());
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] ready to Queryline group");
}
rco_await(locals.gameMgrService->QueryMainland2(locals.playerId, locals.querySpaceReq));
{
// ...
rco_await(locals.gameSceneService->ChangeMainland(locals.playerId, locals.localInstanceKey, locals.changeMainlandReq));
// ...
}
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] send change mainland_conf");
rco_emit_finish_event(rstudio::logic::CoRpcFinishEvent(rstudio::reflection::Value(locals.clientRes)));
rco_return;
rco_end();
C++20 Coroutine 机制简介
Function Body: 通常普通函数添加 co_await 等协程关键字处理返回值就可以作为一个协程函数。 coroutine_handle<>: 对协程的生命周期进行控制。
promise_type: 异常处理, 结果接收, 同时也可以对协程部分行为进行配置, 如协程初始化时的状态, 结束时的状态等。 Awaitable 对象: 业务侧的中断重入点定义和数据传输定制点, 结合 co_await 关键字, 我们就能借助 compiler 实现正确的中断, 重入语义了。
(一) 一个简单的示例 - 并不简单
#include <iostream>
#include <coroutine>
using namespace std;
struct resumable_thing
{
struct promise_type
{
resumable_thing get_return_object()
{
return resumable_thing(coroutine_handle<promise_type>::from_promise(*this));
}
auto initial_suspend() { return suspend_never{}; }
auto final_suspend() noexcept { return suspend_never{}; }
void return_void() {}
void unhandled_exception() {}
};
coroutine_handle<promise_type> _coroutine = nullptr;
resumable_thing() = default;
resumable_thing(resumable_thing const&) = delete;
resumable_thing& operator=(resumable_thing const&) = delete;
resumable_thing(resumable_thing&& other)
: _coroutine(other._coroutine) {
other._coroutine = nullptr;
}
resumable_thing& operator = (resumable_thing&& other) {
if (&other != this) {
_coroutine = other._coroutine;
other._coroutine = nullptr;
}
}
explicit resumable_thing(coroutine_handle<promise_type> coroutine) : _coroutine(coroutine)
{
}
~resumable_thing()
{
if (_coroutine) { _coroutine.destroy(); }
}
void resume() { _coroutine.resume(); }
};
resumable_thing counter() {
cout << "counter: called\n";
for (unsigned i = 1; ; i++)
{
co_await std::suspend_always{};
cout << "counter:: resumed (#" << i << ")\n";
}
}
int main()
{
cout << "main: calling counter\n";
resumable_thing the_counter = counter();
cout << "main: resuming counter\n";
the_counter.resume();
the_counter.resume();
the_counter.resume();
the_counter.resume();
the_counter.resume();
cout << "main: done\n";
return 0;
}
main: calling counter
counter: called
main: resuming counter
counter: resumed (#1)
counter: resumed (#2)
counter: resumed (#3)
counter: resumed (#4)
counter: resumed (#5)
main: done
(二)Coroutine20 的实现猜想
这其实也是 C++20 Coroutine 使用的一大难点, 除了前文提到的, 特性通过 Awaitable 定制点开放给你的地方, 整体的运作机制, 我们是很难直接得出的. 另外, 在一些多线程协程混用的复杂情况下, 整体运作机制对于我们实现正确的框架, 正确的分析解决碰到的问题至关重要. 那么我们现在的问题就变成了, 怎么去补全出包含编译器处理的整体代码?
(三)借助 "cppinsights"
/*************************************************************************************
* NOTE: The coroutine transformation you've enabled is a hand coded transformation! *
* Most of it is _not_ present in the AST. What you see is an approximation. *
*************************************************************************************/
#include <iostream>
#include <coroutine>
using namespace std;
struct resumable_thing
{
struct promise_type
{
inline resumable_thing get_return_object()
{
return resumable_thing(resumable_thing(std::coroutine_handle<promise_type>::from_promise(*this)));
}
inline std::suspend_never initial_suspend()
{
return std::suspend_never{};
}
inline std::suspend_never final_suspend() noexcept
{
return std::suspend_never{};
}
inline void return_void()
{
}
inline void unhandled_exception()
{
}
// inline constexpr promise_type() noexcept = default;
};
std::coroutine_handle<promise_type> _coroutine;
inline constexpr resumable_thing() /* noexcept */ = default;
// inline resumable_thing(const resumable_thing &) = delete;
// inline resumable_thing & operator=(const resumable_thing &) = delete;
inline resumable_thing(resumable_thing && other)
: _coroutine{std::coroutine_handle<promise_type>(other._coroutine)}
{
other._coroutine.operator=(nullptr);
}
inline resumable_thing & operator=(resumable_thing && other)
{
if(&other != this) {
this->_coroutine.operator=(other._coroutine);
other._coroutine.operator=(nullptr);
}
}
inline explicit resumable_thing(std::coroutine_handle<promise_type> coroutine)
: _coroutine{std::coroutine_handle<promise_type>(coroutine)}
{
}
inline ~resumable_thing() noexcept
{
if(static_cast<bool>(this->_coroutine.operator bool())) {
this->_coroutine.destroy();
}
}
inline void resume()
{
this->_coroutine.resume();
}
};
struct __counterFrame
{
void (*resume_fn)(__counterFrame *);
void (*destroy_fn)(__counterFrame *);
std::__coroutine_traits_impl<resumable_thing>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
unsigned int i;
std::suspend_never __suspend_44_17;
std::suspend_always __suspend_48_14;
std::suspend_never __suspend_44_17_1;
};
resumable_thing counter()
{
/* Allocate the frame including the promise */
__counterFrame * __f = reinterpret_cast<__counterFrame *>(operator new(__builtin_coro_size()));
__f->__suspend_index = 0;
__f->__initial_await_suspend_called = false;
/* Construct the promise. */
new (&__f->__promise)std::__coroutine_traits_impl<resumable_thing>::promise_type{};
resumable_thing __coro_gro = __f->__promise.get_return_object() /* NRVO variable */;
/* Forward declare the resume and destroy function. */
void __counterResume(__counterFrame * __f);
void __counterDestroy(__counterFrame * __f);
/* Assign the resume and destroy function pointers. */
__f->resume_fn = &__counterResume;
__f->destroy_fn = &__counterDestroy;
/* Call the made up function with the coroutine body for initial suspend.
This function will be called subsequently by coroutine_handle<>::resume()
which calls __builtin_coro_resume(__handle_) */
__counterResume(__f);
return __coro_gro;
}
/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
try
{
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_counter_1;
case 2: goto __resume_counter_2;
}
/* co_await insights.cpp:44 */
__f->__suspend_44_17 = __f->__promise.initial_suspend();
if(!__f->__suspend_44_17.await_ready()) {
__f->__suspend_44_17.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
__resume_counter_1:
__f->__suspend_44_17.await_resume();
std::operator<<(std::cout, "counter: called\n");
for( __f->i = 1; ; __f->i++) {
/* co_await insights.cpp:48 */
__f->__suspend_48_14 = std::suspend_always{};
if(!__f->__suspend_48_14.await_ready()) {
__f->__suspend_48_14.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
__f->__suspend_index = 2;
return;
}
__resume_counter_2:
__f->__suspend_48_14.await_resume();
std::operator<<(std::operator<<(std::cout, "counter:: resumed (#").operator<<(__f->i), ")\n");
}
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
__final_suspend:
/* co_await insights.cpp:44 */
__f->__suspend_44_17_1 = __f->__promise.final_suspend();
if(!__f->__suspend_44_17_1.await_ready()) {
__f->__suspend_44_17_1.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
}
;
}
/* This function invoked by coroutine_handle<>::destroy() */
void __counterDestroy(__counterFrame * __f)
{
/* destroy all variables with dtors */
__f->~__counterFrame();
/* Deallocating the coroutine frame */
operator delete(__builtin_coro_free(static_cast<void *>(__f)));
}
int main()
{
std::operator<<(std::cout, "main: calling counter\n");
resumable_thing the_counter = counter();
std::operator<<(std::cout, "main: resuming counter\n");
the_counter.resume();
the_counter.resume();
the_counter.resume();
the_counter.resume();
the_counter.resume();
std::operator<<(std::cout, "main: done\n");
return 0;
}
(四) Coroutine20 基本结构 - Compiler 视角
virtual table 部分, 正确的告知你协程使用的 resume 函数以及 destroy 函数 自动处理的栈变量, 如下图中所示的 i
各种使用到的 awaitable object, 这是因为 awaitable object 本身也是有状态的, 需要正确记录 当前执行到的位置, 这个是通过整形的__suspend_index来记录的.
(五)Compiler 视角重新分析示例代码
couter() - Function Body
我们知道, couter()会被编译器改写, 最终其实是变成了三个函数:
单纯负责生命周期以及生成正确的__counterFrame对象的counter(), 只是一个协程入口函数.
负责真正执行逻辑的 __counterResume()函数, 它的输入参数就是__counterFrame对象.
负责删除**counterFrame 对象的 **counterDestroy()函数.
通过一拆三, 编译器很好的解决了协程的入口, 协程的中断重入, 和协程以及相关对象的销毁的问题.
coroutine_handle<>
template <> struct coroutine_handle<void>{
constexpr coroutine_handle() noexcept;
constexpr coroutine_handle(nullptr_t) noexcept;
coroutine_handle& operator=(nullptr_t) noexcept;
constexpr void* address() const noexcept;
constexpr static coroutine_handle from_address(void* addr);
constexpr explicit operator bool() const noexcept;
bool done() const;
void operator()();
void resume();
void destroy();
private:
void* ptr;// exposition only
};
template <typename Promise>
struct coroutine_handle
: coroutine_handle<void>
{
Promise& promise() const noexcept;
static coroutine_handle from_promise(Promise&) noexcept;
};
promise_type
/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
try
{
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_counter_1;
case 2: goto __resume_counter_2;
}
/* initial suspend handle here~~ */
__f->__suspend_44_17 = __f->__promise.initial_suspend();
__resume_counter_1:
/* do somthing for yield~~ */
__resume_counter_2:
/* do somthing for resume~~ */
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
__final_suspend:
/* final suspend here~~ */
__f->__suspend_44_17_1 = __f->__promise.final_suspend();
}
Awaitable 对象
std:suspend_always std::suspend_never另外我们也能通过多种方式定义 awaitable 对象 通过重载promise_type的await_transform() - 这是 asio 所使用的方式, 侵入性比较强 通过为对象实现operator co_await() 通过实现 awaitable 对象需要的三个子函数await_ready(), await_suspend(), await_resume() - 推荐的方式 那么当我们调用co_await awaitable的时候, 发生的事情是什么呢, 我们同样通过预处理的代码来进行了解:
__resume_counter_1:
__f->__suspend_44_17.await_resume();
std::operator<<(std::cout, "counter: called\n");
for( __f->i = 1; ; __f->i++) {
/* co_await insights.cpp:48 */
__f->__suspend_48_14 = std::suspend_always{};
if(!__f->__suspend_48_14.await_ready()) {
__f->__suspend_48_14.await_suspend(coroutine_handle);
__f->__suspend_index = 2;
return;
}
__resume_counter_2:
__f->__suspend_48_14.await_resume();
std::cout << "counter:: resumed (#" << __f->i << ")\n";
}
await_ready() - 判断是否需要挂起, 如不需要挂起, 则直接执行后续逻辑, 这里也就是继续到__resume_counter_2这个 label 执行重入点的逻辑 await_suspend() - 中断点触发的时候执行的逻辑, 业务中我们一般在此处发起异步操作 await_resume() - 重入点触发的时候执行的逻辑. 整体的机制是不是清晰了很多?
(六) 小结 - C++20 协程的特点总结
一套理解上稍显复杂, 需要结合 cppinsights 等工具才能了解整体的运行机制 适当封装, 还是能够很好的满足业务需求 对比 17 版本的实现, 20 版基本上没有什么使用上的限制 自动栈变量的处理, 可以让业务侧以更低的心智负担来进行开发 通过 Awaitable 对象, 我们能够扩展co_await支持的业务, 这种实现侵入性低, 实际使用负担小 对于异步操作较多, 多节点较多, 特别是多个异步操作级联的使用场景, 很值得实装. 最后我们讲解使用的是 clang, 但对于 gcc, msvc, 这些同样适用, 标准的提案来源是一致的, 都是 msvc 发起的那份, compiler 实现上存在一些细微的差异, 但基本不影响使用.
(一)Sheduler 实现的动机
(二)Scheduler 核心机制
Awaitable 机制: 前面也介绍了利用 c++20 的 co_await 关键字和 awaitable 对象, 我们可以很好的定义挂起点, 以及交换协程和外部系统的数据。 Return Callback 机制: 部分协程执行完后需要向外界反馈执行结果(如协程模式执行的 Rpc Service).
(三)Scheduler 核心对象
ISchedTask & SchedTaskCpp20
using CoReturnFunction = std::function<void(const CoReturnObject*)>;class ISchedTask{ friend class Scheduler; public: ISchedTask() = delete; ISchedTask(const SchedTaskCpp17&) = delete; ISchedTask(uint64_t taskId, Scheduler* manager); virtual ~ISchedTask(); uint64_t GetId() const; virtual int Run() = 0; virtual bool IsDone() const = 0; virtual CO_TASK_STATE GetCoState() const = 0; void BindSleepHandle(uint64_t handle); AwaitMode GetAwaitMode() const; int GetAwaitTimeout() const; template<typename AwaitEventType> auto BindResumeObject(AwaitEventType&& awaitEvent)->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>; template<typename AwaitEventType> auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>; bool HasResumeObject() const noexcept; void ClearResumeObject(); bool IsLastInvokeSuc() const noexcept; bool IsLastInvokeTimeOut() const noexcept; bool IsLastInvokeFailed() const noexcept; void AddChildTask(uint64_t tid); void AddWaitNofityTask(uint64_t tid); const auto& GetChildTaskArray() const; const auto& GetWaitNotifyArray() const; void Terminate(); Scheduler* GetManager() const; static ISchedTask* CurrentTask(); void DoYield(AwaitMode mode, int awaitTimeMs = 0); void SetReturnFunction(CoReturnFunction&& func); void DoReturn(const CoReturnObject& obj); void DoReturn(); protected: uint64_t mTaskId; Scheduler* mManager; std::vector<uint64_t> mChildArray; std::vector<uint64_t> mWaitNotifyArray; //value used to return from coroutine AwaitMode mAwaitMode = AwaitMode::AwaitDoNothing; int mAwaitTimeout = 0; //value used to send to coroutine(now as a AwaitEvent) reflection::UserObject mResumeObject; uint64_t mSleepHandle = 0; bool mIsTerminate = false; CoReturnFunction mCoReturnFunc;};class SchedTaskCpp20: public ISchedTask{ public: SchedTaskCpp20(uint64_t taskId, CoTaskFunction&& taskFunc, Scheduler* manager); ~SchedTaskCpp20(); int Run() override; bool IsDone() const override; CO_TASK_STATE GetCoState() const override; void BindSelfToCoTask(); const CoResumingTaskCpp20& GetResumingTask() const; protected: CoResumingTaskCpp20 mCoResumingTask; CoTaskFunction mTaskFuncion;};
C++20 的 SchedTaskCpp20 主要完成对协程对象的封装, CoTaskFunction 用于存储相关的函数对象, 而 CoResumingTaskCpp20 则如同前面示例中的 resumable_thing 对象,内部有需要的 promise_type 实现, 我们对协程的访问也是通过它来完成的。
此处需要注意的是我们保存了协程对象外, 还额外保存了相关的函数对象, 这是因为如果协程本身是一个 lambda, compiler 并不会帮我们正确维护 lambda 的生命周期以及 lambda 所捕获的函数, 尚未清楚是实现缺陷还是功能就是如此, 所以此处需要一个额外存在的 std::function<>对象, 来保证对应 lambda 的生命周期是正确的。
对比 17 的实现, 我们的 SchedTask 对象中主要保留了:reflection::UserObject mResumeObject: 主要用于异步等待的执行, 当一个异步等待成功执行的时候, 向协程传递值。
原来利用事件去处理最终返回值的机制也替换成了 Return 回调的方式,相对来说更简单直接, 利用 lambda 本身也能很方便的保存需要最终回传的临时值了。
Scheduler
Scheduler 的代码比较多, 主要就是 SchedTask 的管理器, 另外也完成对前面提到的三种机制的支持, 文章重点分析一下三种机制的实现代码.
Yield 处理
void Scheduler::Update()
{
RSTUDIO_PROFILER_METHOD_INFO(sUpdate, "Scheduler::Update()", rstudio::ProfilerGroupType::kLogicJob);
RSTUDIO_PROFILER_AUTO_SCOPE(sUpdate);
//Handle need kill task first
while(!mNeedKillArray.empty())
{
auto tid = mNeedKillArray.front();
mNeedKillArray.pop();
auto* tmpTask = GetTaskById(tid);
if (tmpTask != nullptr)
{
DestroyTask(tmpTask);
}
}
//Keep a temp queue for not excute next frame task right now
decltype(mFrameStartTasks) tmpFrameTasks;
mFrameStartTasks.swap(tmpFrameTasks);
while (!tmpFrameTasks.empty())
{
auto task_id = tmpFrameTasks.front();
tmpFrameTasks.pop();
auto* task = GetTaskById(task_id);
LOG_CHECK_ERROR(task);
if (task)
{
AddToImmRun(task);
}
}
}
void Scheduler::AddToImmRun(ISchedTask* schedTask)
{
LOG_PROCESS_ERROR(schedTask);
schedTask->Run();
if (schedTask->IsDone())
{
DestroyTask(schedTask);
return;
}
{
auto awaitMode = schedTask->GetAwaitMode();
auto awaitTimeoutMs = schedTask->GetAwaitTimeout();
switch (schedTask->GetAwaitMode())
{
case rstudio::logic::AwaitMode::AwaitNever:
AddToImmRun(schedTask);
break;
case rstudio::logic::AwaitMode::AwaitNextframe:
AddToNextFrameRun(schedTask);
break;
case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:
case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:
{
HandleTaskAwaitForNotify(schedTask, awaitMode, awaitTimeoutMs);
}
break;
case rstudio::logic::AwaitMode::AwaitDoNothing:
break;
default:
RSTUDIO_ERROR(CanNotRunToHereError());
break;
}
}
Exit0:
return;
}
rstudio::logic::AwaitMode::AwaitNever: 立即将协程加入回 mReadyTask 队列, 对应协程会被马上唤醒执行 rstudio::logic::AwaitMode::AwaitNextframe: 将协程加入到下一帧执行的队列, 协程将会在下一帧被唤醒执行 rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout: 等待外界通知后再唤醒执行(无超时模式), 注意该模式下如果一直没收到通知, 相关协程会一直在队列中存在. rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:同 3, 差别是存在一个超时时间, 超时时间到了也会唤醒协程, 业务方可以通过 ResumeObject 判断协程是被超时唤醒的. **rstudio::logic::AwaitMode::AwaitDoNothing:**特殊的 AwaitHandle 实现会使用该模式, 比如删除 Task 的实现, 都要删除 Task 了, 我们肯定不需要再将 Task 加入任何可唤醒队列了.
Resume处理
Resume 机制主要是通过唤醒在 Await 队列中的协程的时候向关联的 Task 对象传递 ResumeObject 实现的:
//Not a real event notify here, just do need things
template <typename E>
auto ResumeTaskByAwaitObject(E&& awaitObj)
-> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>
{
auto tid = awaitObj.taskId;
if (IsTaskInAwaitSet(tid))
{
//Only in await set task can be resume
auto* task = GetTaskById(tid);
if (RSTUDIO_LIKELY(task != nullptr))
{
task->BindResumeObject(std::forward<E>(awaitObj));
AddToImmRun(task);
}
OnTaskAwaitNotifyFinish(tid);
}
}
#define rco_get_resume_object(ResumeObjectType) rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()
本身就是一个简单的传值取值的过程. 注意传递 ResumeObject 后, 我们也会马上将协程加入到 mReadTasks 队列中以方便在接下来的 Update 中唤醒它.
一个 Awaitable 实现的范例
我们以 Rpc 的协程化 Caller 实现为例, 看看一个 awaitable 对象应该如何构造:
class RSTUDIO_APP_SERVICE_API RpcRequest
{
public:
RpcRequest() = delete;
////RpcRequest(const RpcRequest&) = delete;
~RpcRequest() = default;
RpcRequest(const logic::GameServiceCallerPtr& proxy,
const std::string_view funcName,
reflection::Args&& arg, int timeoutMs) :
mProxy(proxy)
, mFuncName(funcName)
, mArgs(std::forward<reflection::Args>(arg))
, mTimeoutMs(timeoutMs)
{}
bool await_ready()
{
return false;
}
void await_suspend(coroutine_handle<>) const noexcept
{
auto* task = rco_self_task();
auto context = std::make_shared<ServiceContext>();
context->TaskId = task->GetId();
context->Timeout = mTimeoutMs;
auto args = mArgs;
mProxy->DoDynamicCall(mFuncName, std::move(args), context);
task->DoYield(AwaitMode::AwaitForNotifyNoTimeout);
}
::rstudio::logic::RpcResumeObject* await_resume() const noexcept
{
return rco_get_resume_object(logic::RpcResumeObject);
}
private:
logic::GameServiceCallerPtr mProxy;
std::string mFuncName;
reflection::Args mArgs;
int mTimeoutMs;
};
重点是前面说到的 await_ready(), await_suspend(), await_resume()函数的实现。
ReturnCallback 机制
有一些场合, 可能需要协程执行完成后向业务系统发起通知并传递返回值, 比如 Rpc Service 的协程支持实现等, 这个特性其实比较类似 go 的 defer, 只是这里的实现更简单, 只支持单一函数的指定而不是队列. 我们直接以 RpcService 的协程支持为例来看一下这一块的具体使用.
首先是业务侧, 在创建完协程后, 需要给协程绑定后续协程执行完成后做进一步操作需要的数据:
task->SetReturnFunction(
[this, server, entity, cmdHead, routerAddr,
reqHead, context](const CoReturnObject* obj) {
const auto* returnObj = dynamic_cast<const CoRpcReturnObject*>(obj);
if (RSTUDIO_LIKELY(returnObj))
{
DoRpcResponse(server, entity.get(), routerAddr, &cmdHead,
reqHead, const_cast<ServiceContext&>(context),
returnObj->rpcResultType,
returnObj->totalRet, returnObj->retValue);
}
});
CoTaskInfo HeartBeatService::DoHeartBeat(
logic::Scheduler& scheduler, int testVal)
{
return scheduler.CreateTask20(
[testVal]() -> logic::CoResumingTaskCpp20 {
co_await logic::cotasks::Sleep(1000);
printf("service yield call finish!\n");
co_return CoRpcReturnObject(reflection::Value(testVal + 1));
}
);
}
void CoResumingTaskCpp20::promise_type::return_value(const CoReturnObject& obj)
{
auto* task = rco_self_task();
task->DoReturn(obj);
}
(四) 示例代码
//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
-> rstudio::logic::CoResumingTaskCpp20 {
auto* task = rco_self_task();
printf("step1: task is %llu\n", task->GetId());
co_await rstudio::logic::cotasks::NextFrame{};
printf("step2 after yield!\n");
int c = 0;
while (c < 5) {
printf("in while loop c=%d\n", c);
co_await rstudio::logic::cotasks::Sleep(1000);
c++;
}
for (c = 0; c < 5; c++) {
printf("in for loop c=%d\n", c);
co_await rstudio::logic::cotasks::NextFrame{};
}
printf("step3 %d\n", c);
auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
[]()-> logic::CoResumingTaskCpp20 {
printf("from child coroutine!\n");
co_await rstudio::logic::cotasks::Sleep(2000);
printf("after child coroutine sleep\n");
});
printf("new task create in coroutine: %llu\n", newTaskId);
printf("Begin wait for task!\n");
co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
printf("After wait for task!\n");
rstudio::logic::cotasks::RpcRequest
rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
auto* rpcret = co_await rpcReq;
if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
assert(rpcret->totalRet == 1);
auto retval = rpcret->retValue.to<int>();
assert(retval == 4);
printf("rpc coroutine run suc, val = %d!\n", retval);
}
else {
printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);
}
co_await rstudio::logic::cotasks::Sleep(5000);
printf("step4, after 5s sleep\n");
co_return rstudio::logic::CoNil;
} );
step1: task is 1
step2 after yield!
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc, val = 4!
step4, after 5s sleep
代码更精简了 Stack 变量可以被 Compiler 自动处理, 正常使用了。 co_await 可以直接返回值, 并有强制的类型约束了。 一个协程函数就是一个返回值为 logic::CoResumingTaskCpp20 类型的 lambda, 可以充分利用 lambda 本身的特性还实现正确的逻辑了。
(一) 示例代码
//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");mScheduler.CreateTask20([clientProxy]() -> rstudio::logic::CoResumingTaskCpp20 { auto* task = rco_self_task(); printf("step1: task is %llu\n", task->GetId()); co_await rstudio::logic::cotasks::NextFrame{}; printf("step2 after yield!\n"); int c = 0; while (c < 5) { printf("in while loop c=%d\n", c); co_await rstudio::logic::cotasks::Sleep(1000); c++; } for (c = 0; c < 5; c++) { printf("in for loop c=%d\n", c); co_await rstudio::logic::cotasks::NextFrame{}; } printf("step3 %d\n", c); auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false, []()-> logic::CoResumingTaskCpp20 { printf("from child coroutine!\n"); co_await rstudio::logic::cotasks::Sleep(2000); printf("after child coroutine sleep\n"); }); printf("new task create in coroutine: %llu\n", newTaskId); printf("Begin wait for task!\n"); co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 }; printf("After wait for task!\n"); rstudio::logic::cotasks::RpcRequest rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000}; auto* rpcret = co_await rpcReq; if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) { assert(rpcret->totalRet == 1); auto retval = rpcret->retValue.to<int>(); assert(retval == 4); printf("rpc coroutine run suc, val = %d!\n", retval); } else { printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType); } co_await rstudio::logic::cotasks::Sleep(5000); printf("step4, after 5s sleep\n"); co_return rstudio::logic::CoNil;} );
(二)小议 C++20 Coroutine 对比 C++17 Coroutine 带来的改进
原生关键字 co_await, co_return 的支持, 业务侧使用代码更加精简, 也进一步统一了大家对无栈协程的标准理解. Stack 变量可以被 compiler 自动处理, 这点对比 C++17 需要自行组织状态变量来说是非常节约心智负责的. co_await可以直接返回对应类型的值, 这样协程本身就有了强制的类型约束, 整体业务的表达也会因为不需要从类型擦除的对象获取需要的类型, 变得更顺畅.
(一) 一个 Python 实现的技能示例
实现效果
技能主流程代码
# handle one skill instance create
def skill_instance_run_func(instance, user, skill_data, target, target_pos, finish_func):
# set return callback here
yield TaskSetExitCallback(finish_func)
# ... some code ignore here
from common.gametime import GameTime
init_time = GameTime.now_time
for skill_step in step_list:
step_start_time = GameTime.now_time
# ... some code ignore here
### 1. period task handle
if skill_step.cast_type == CastSkillStep.CAST_TYPE_PERIOD:
#... some code ignore here
### 2. missle skill
elif skill_step.cast_type == CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:
if len(skill_step.cast_action_group_list) > 0:
action_group = skill_step.cast_action_group_list[0]
for i in range(skill_step.cast_count):
# yield for sleep
yield TaskSleep(skill_step.cast_period)
ret_val = do_skill_spend(skill_data, user, instance)
if not ret_val:
return
# sub coroutine(missle_handle_func)
task_id = yield TaskNew(missle_handle_func(
skill_data, instance, user, skill_step, action_group, target_id, target_pos))
instance.add_child_task_id(task_id)
### 3. guide skill
elif skill_step.cast_type == CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:
#... some code ignore here
now_time = GameTime.now_time
step_pass_time = now_time - step_start_time
need_sleep_time = skill_step.step_total_time - step_pass_time
if need_sleep_time > 0:
yield TaskSleep(need_sleep_time)
instance.on_one_step_finish(skill_step)
if skill_data.delay_end_time > 0:
yield TaskSleep(skill_data.delay_end_time)
# wait for child finish~~
for task_id in instance.child_task_list:
yield TaskWait(task_id)
instance.task_id = 0
CastSkillStep.CAST_TYPE_PERIOD:周期性触发的技能, 主要使用 yield TaskSleep() CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:导弹类技能, 使用子协程功能 CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:引导类技能, 使用子协程功能
子任务 - 导弹类技能相关代码
### 1. handle for missle skill(etc: fire ball)def missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos): effect = instance.create_effect(skill_step.missle_info.missle_fx_path) effect.set_scale(skill_step.missle_info.missle_scale) cur_target_pos, is_target_valid = skill_step.missle_info.get_target_position( user, target_id, target_pos) start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos) is_reach_target = False from common.gametime import GameTime init_time = GameTime.now_time while True: # ... some code ignore here fly_distance = skill_step.missle_info.fly_speed*GameTime.elapse_time if fly_distance < total_distance: start_pos += fly_direction*math3d.vector(fly_distance, fly_distance, fly_distance) effect.set_position(start_pos) else: is_reach_target = True break # do yield util next frame yield effect.destroy() if is_reach_target: target_list = skill_data.get_target_list(user.caster, target_id, target_pos) for target in target_list: action_group.do(user.caster, target)
子任务 - 引导类技能代码
### 2. handle for guide skill(etc: lighting chain)
def guide_handle_func(skill_data, instance, user, skill_step, start_pos, target_id, target_pos):
effect = instance.create_effect(skill_step.guide_info.guide_fx_path)
effect.set_scale(skill_step.guide_info.guide_scale)
effect.set_position(start_pos)
effect.set_guide_end_pos(target_pos - start_pos)
# yield for sleep
yield TaskSleep(skill_step.guide_info.guide_time)
effect.destroy()
(二)对应的 C++实现
//C++ 20 skill test coroutine
mScheduler.CreateTask20([instance]() -> rstudio::logic::CoResumingTaskCpp20 {
rstudio::logic::ISchedTask* task = rco_self_task();
task->SetReturnFunction([](const rstudio::logic::CoReturnObject*) {
//ToDo: return handle code add here
});
for (auto& skill_step : step_list) {
auto step_start_time = GGame->GetTimeManager().GetTimeHardwareMS();
switch (skill_step.cast_type) {
case CastSkillStep::CAST_TYPE_PERIOD: {
//... some code ignore here
}
break;
case CastSkillStep::CAST_TYPE_MISSLE_TO_TARGET: {
if (skill_step.cast_action_group_list.size() > 0) {
auto& action_group = skill_step.cast_action_group_list[0];
for (int i = 0; i < skill_step.cast_count; i++) {
co_await rstudio::logic::cotasks::Sleep(skill_step.cast_period);
bool ret_val = do_skill_spend(skill_data, user, instance);
if (!ret_val) {
co_return rstudio::logic::CoNil;
}
auto task_id = co_await rstudio::logic::cotasks::CreateTask(true,
[&skill_step]()->rstudio::logic::CoResumingTaskCpp20 {
auto cur_target_pos = skill_step.missle_info.get_target_position(
user, target_id, target_pos);
auto start_pos = skill_step.missle_info.get_start_position(
user, target_id, target_pos);
bool is_reach_target = false;
auto init_time = GGame->GetTimeManager().GetTimeHardwareMS();
auto last_time = init_time;
do {
auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
auto elapse_time = now_time - last_time;
last_time = now_time;
if (now_time - init_time >= skill_step.missle_info.long_fly_time) {
break;
}
auto cur_target_pos = skill_step.missle_info.get_target_position(
user, target_id, target_pos);
rstudio::math::Vector3 fly_direction = cur_target_pos - start_pos;
auto total_distance = fly_direction.Normalise();
auto fly_distance = skill_step.missle_info.fly_speed * elapse_time;
if (fly_distance < total_distance) {
start_pos += fly_direction * fly_distance;
}
else {
is_reach_target = true;
break;
}
co_await rstudio::logic::cotasks::NextFrame{};
} while (true);
if (is_reach_target) {
//ToDo: add damage calculate here~~
}
});
instance.add_child_task_id(task_id);
}
}
}
break;
case CastSkillStep::CAST_TYPE_GUIDE_TO_TARGET: {
//... some code ignore here
}
break;
default:
break;
}
auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
auto step_pass_time = now_time - step_start_time;
auto need_sleep_time = skill_step.step_total_time - step_pass_time;
if (need_sleep_time > 0) {
co_await rstudio::logic::cotasks::Sleep(need_sleep_time);
}
instance.on_one_step_finish(skill_step);
}
if (skill_data.delay_end_time > 0) {
co_await rstudio::logic::cotasks::Sleep(skill_data.delay_end_time);
}
for (auto tid :instance.child_task_list) {
co_await rstudio::logic::cotasks::WaitTaskFinish(tid, 10000);
}
});
(三) 小结
结合调度器, C++ Coroutine 的实现与脚本一样具备简洁性, 这得益于 Compiler 对 Stack 变量的自动处理, 以及规整的co_await等关键字支持, 从某种程度上, 我们可以认为这种处理提供了一个简单的类 GC 的能力, 我们可以更低心智负担的开发相关代码. 协程的使用同时也会带来其他一些好处, 像避免多级 Callback 带来的代码分散逻辑混乱等问题, 这个在 C++17 协程使用的范例中已经提到过, 此处不再重复.
(一) 对 asio coroutine20 实现部分的思考
低使用成本的经典 callback 兼容方案
asio::awaitable<void> watchdog(asio::io_context& ctx) {
asio::steady_timer timer(ctx);
timer.expires_after(1s);
co_await timer.async_wait(asio::use_awaitable);
co_return;
}
利用操作符定义复合任务
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e)
{
co_await (
(
transfer(client, server, client_to_server_deadline) ||
watchdog(client_to_server_deadline)
)
&&
(
transfer(server, client, server_to_client_deadline) ||
watchdog(server_to_client_deadline)
)
);
}
通过这种机制, 我们一定程度拥有了对任务的复合关系进行表达的能力, 比如对一个原本不支持超时的异步任务, 我们可以非常简单的||上一个超时异步任务, 来解决它的超时支持问题. 这种设计也是很值得参考的.
(二) 关于 executions
(三) 关于后续的迭代
推荐阅读
点击下方空白 ▼ 查看明日开发者黄历