Compare commits

..

No commits in common. "e2e5d6d57f26b79bab83e87d1d3596d077925603" and "5d0c70278e9ddcd3ec2adcad0281458e25315afe" have entirely different histories.

5 changed files with 189 additions and 96 deletions

View File

@ -24,6 +24,7 @@
#include <exception> #include <exception>
#include <future> #include <future>
#include <mutex> #include <mutex>
#include <list>
#include <type_traits> #include <type_traits>
#include <optional> #include <optional>
@ -83,11 +84,6 @@ struct is_awaitable<
template <typename T> template <typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value; constexpr bool is_awaitable_v = is_awaitable<T>::value;
/**
* @struct final_awaiter
* @brief An awaiter for `Task::promise_type::final_suspend()`. Transfer
* execution back to the coroutine who is co_awaiting this Task.
*/
struct final_awaiter struct final_awaiter
{ {
bool await_ready() noexcept bool await_ready() noexcept
@ -106,52 +102,6 @@ struct final_awaiter
} }
}; };
/**
* @struct task_awaiter
* @brief Convert Task to an awaiter when it is co_awaited.
* Following things will happen:
* 1. Suspend current coroutine
* 2. Set current coroutine as continuation of this Task
* 3. Transfer execution to the co_awaited Task
*/
template <typename Promise>
struct task_awaiter
{
using handle_type = std::coroutine_handle<Promise>;
public:
explicit task_awaiter(handle_type coro) : coro_(coro)
{
}
bool await_ready() noexcept
{
return !coro_ || coro_.done();
}
auto await_suspend(std::coroutine_handle<> handle) noexcept
{
coro_.promise().setContinuation(handle);
return coro_;
}
auto await_resume()
{
if constexpr (std::is_void_v<decltype(coro_.promise().result())>)
{
coro_.promise().result(); // throw exception if any
return;
}
else
{
return std::move(coro_.promise().result());
}
}
private:
handle_type coro_;
};
template <typename T = void> template <typename T = void>
struct [[nodiscard]] Task struct [[nodiscard]] Task
{ {
@ -164,7 +114,7 @@ struct [[nodiscard]] Task
Task(const Task &) = delete; Task(const Task &) = delete;
Task(Task &&other) noexcept Task(Task &&other)
{ {
coro_ = other.coro_; coro_ = other.coro_;
other.coro_ = nullptr; other.coro_ = nullptr;
@ -178,7 +128,7 @@ struct [[nodiscard]] Task
Task &operator=(const Task &) = delete; Task &operator=(const Task &) = delete;
Task &operator=(Task &&other) noexcept Task &operator=(Task &&other)
{ {
if (std::addressof(other) == this) if (std::addressof(other) == this)
return *this; return *this;
@ -248,9 +198,69 @@ struct [[nodiscard]] Task
std::coroutine_handle<> continuation_; std::coroutine_handle<> continuation_;
}; };
auto operator co_await() const noexcept auto operator co_await() const &noexcept
{ {
return task_awaiter(coro_); struct awaiter
{
public:
explicit awaiter(handle_type coro) : coro_(coro)
{
}
bool await_ready() noexcept
{
return !coro_ || coro_.done();
}
auto await_suspend(std::coroutine_handle<> handle) noexcept
{
coro_.promise().setContinuation(handle);
return coro_;
}
T await_resume()
{
auto &&v = coro_.promise().result();
return std::move(v);
}
private:
handle_type coro_;
};
return awaiter(coro_);
}
auto operator co_await() const &&noexcept
{
struct awaiter
{
public:
explicit awaiter(handle_type coro) : coro_(coro)
{
}
bool await_ready() noexcept
{
return !coro_ || coro_.done();
}
auto await_suspend(std::coroutine_handle<> handle) noexcept
{
coro_.promise().setContinuation(handle);
return coro_;
}
T await_resume()
{
return std::move(coro_.promise().result());
}
private:
handle_type coro_;
};
return awaiter(coro_);
} }
handle_type coro_; handle_type coro_;
@ -268,7 +278,7 @@ struct [[nodiscard]] Task<void>
Task(const Task &) = delete; Task(const Task &) = delete;
Task(Task &&other) noexcept Task(Task &&other)
{ {
coro_ = other.coro_; coro_ = other.coro_;
other.coro_ = nullptr; other.coro_ = nullptr;
@ -282,7 +292,7 @@ struct [[nodiscard]] Task<void>
Task &operator=(const Task &) = delete; Task &operator=(const Task &) = delete;
Task &operator=(Task &&other) noexcept Task &operator=(Task &&other)
{ {
if (std::addressof(other) == this) if (std::addressof(other) == this)
return *this; return *this;
@ -335,9 +345,68 @@ struct [[nodiscard]] Task<void>
std::coroutine_handle<> continuation_; std::coroutine_handle<> continuation_;
}; };
auto operator co_await() const noexcept auto operator co_await() const &noexcept
{ {
return task_awaiter(coro_); struct awaiter
{
public:
explicit awaiter(handle_type coro) : coro_(coro)
{
}
bool await_ready() noexcept
{
return !coro_ || coro_.done();
}
auto await_suspend(std::coroutine_handle<> handle) noexcept
{
coro_.promise().setContinuation(handle);
return coro_;
}
auto await_resume()
{
coro_.promise().result();
}
private:
handle_type coro_;
};
return awaiter(coro_);
}
auto operator co_await() const &&noexcept
{
struct awaiter
{
public:
explicit awaiter(handle_type coro) : coro_(coro)
{
}
bool await_ready() noexcept
{
return false;
}
auto await_suspend(std::coroutine_handle<> handle) noexcept
{
coro_.promise().setContinuation(handle);
return coro_;
}
void await_resume()
{
coro_.promise().result();
}
private:
handle_type coro_;
};
return awaiter(coro_);
} }
handle_type coro_; handle_type coro_;
@ -360,7 +429,7 @@ struct AsyncTask
AsyncTask(const AsyncTask &) = delete; AsyncTask(const AsyncTask &) = delete;
AsyncTask(AsyncTask &&other) noexcept AsyncTask(AsyncTask &&other)
{ {
coro_ = other.coro_; coro_ = other.coro_;
other.coro_ = nullptr; other.coro_ = nullptr;
@ -368,7 +437,7 @@ struct AsyncTask
AsyncTask &operator=(const AsyncTask &) = delete; AsyncTask &operator=(const AsyncTask &) = delete;
AsyncTask &operator=(AsyncTask &&other) noexcept AsyncTask &operator=(AsyncTask &&other)
{ {
if (std::addressof(other) == this) if (std::addressof(other) == this)
return *this; return *this;
@ -380,6 +449,8 @@ struct AsyncTask
struct promise_type struct promise_type
{ {
std::coroutine_handle<> continuation_;
AsyncTask get_return_object() noexcept AsyncTask get_return_object() noexcept
{ {
return {std::coroutine_handle<promise_type>::from_promise(*this)}; return {std::coroutine_handle<promise_type>::from_promise(*this)};
@ -400,12 +471,51 @@ struct AsyncTask
{ {
} }
std::suspend_never final_suspend() const noexcept void setContinuation(std::coroutine_handle<> handle)
{ {
return {}; continuation_ = handle;
}
auto final_suspend() const noexcept
{
// Can't simply use suspend_never because we need symmetric transfer
struct awaiter final
{
bool await_ready() const noexcept
{
return true;
}
auto await_suspend(
std::coroutine_handle<promise_type> coro) const noexcept
{
return coro.promise().continuation_;
}
void await_resume() const noexcept
{
}
};
return awaiter{};
} }
}; };
bool await_ready() const noexcept
{
return coro_.done();
}
void await_resume() const noexcept
{
}
auto await_suspend(std::coroutine_handle<> coroutine) noexcept
{
coro_.promise().setContinuation(coroutine);
return coro_;
}
handle_type coro_; handle_type coro_;
}; };

View File

@ -31,7 +31,7 @@ struct StructAwaiter : public CallbackAwaiter<std::shared_ptr<SomeStruct>>
} // namespace drogon::internal } // namespace drogon::internal
// Workaround limitation of macros // Workarround limitation of macros
template <typename T> template <typename T>
using is_int = std::is_same<T, int>; using is_int = std::is_same<T, int>;
template <typename T> template <typename T>
@ -69,7 +69,7 @@ DROGON_TEST(CroutineBasics)
}()); }());
CHECK(n == 1); CHECK(n == 1);
// Testing that exceptions can propagate through coroutines // Testing that exceptions can propergate through coroutines
auto throw_in_task = [TEST_CTX]() -> Task<> { auto throw_in_task = [TEST_CTX]() -> Task<> {
auto f = []() -> Task<> { throw std::runtime_error("test error"); }; auto f = []() -> Task<> { throw std::runtime_error("test error"); };
@ -77,7 +77,7 @@ DROGON_TEST(CroutineBasics)
}; };
sync_wait(throw_in_task()); sync_wait(throw_in_task());
// Test sync_wait propagates exception // Test sync_wait propergrates exception
auto throws = []() -> Task<> { auto throws = []() -> Task<> {
throw std::runtime_error("bla"); throw std::runtime_error("bla");
co_return; co_return;
@ -100,7 +100,7 @@ DROGON_TEST(CroutineBasics)
}; };
sync_wait(await_non_copyable()); sync_wait(await_non_copyable());
// This only works because async_run tries to run the coroutine as soon as // This only works because async_run tries to run the corouine as soon as
// possible and the coroutine does not wait // possible and the coroutine does not wait
int testVar = 0; int testVar = 0;
async_run([&testVar]() -> Task<void> { async_run([&testVar]() -> Task<void> {

View File

@ -164,40 +164,26 @@ class DROGON_EXPORT RedisClient
std::string_view command, std::string_view command,
Args &&...args) Args &&...args)
{ {
return execCommandSync<std::decay_t<decltype(processFunc)>>( std::shared_ptr<std::promise<T>> pro(new std::promise<T>);
std::move(processFunc), command, std::forward<Args>(args)...); std::future<T> f = pro->get_future();
}
/**
* @brief Execute a redis command synchronously
* Return type can be deduced automatically in this version.
*/
template <typename F, typename... Args>
std::invoke_result_t<F, const RedisResult &> execCommandSync(
F &&processFunc,
std::string_view command,
Args &&...args)
{
using Ret = std::invoke_result_t<F, const RedisResult &>;
std::promise<Ret> prom;
execCommandAsync( execCommandAsync(
[&processFunc, &prom](const RedisResult &result) { [process = std::move(processFunc), pro](const RedisResult &result) {
try try
{ {
prom.set_value(processFunc(result)); pro->set_value(process(result));
} }
catch (...) catch (...)
{ {
prom.set_exception(std::current_exception()); pro->set_exception(std::current_exception());
} }
}, },
[&prom](const RedisException &err) { [pro](const RedisException &err) {
prom.set_exception(std::make_exception_ptr(err)); pro->set_exception(std::make_exception_ptr(err));
}, },
command, command,
std::forward<Args>(args)...); std::forward<Args>(args)...);
return prom.get_future().get(); return f.get();
} }
/** /**

View File

@ -168,14 +168,11 @@ DROGON_TEST(RedisTest)
SUCCESS(); SUCCESS();
} }
// 12. Test omit template parameter
try try
{ {
auto i = redisClient->execCommandSync( redisClient->execCommandSync<int>([](const RedisResult &) { return 1; },
[](const RedisResult &r) { return r.asInteger(); }, "del %s",
"del %s", "sync_key");
"sync_key");
MANDATE(i == 1);
} }
catch (const RedisException &err) catch (const RedisException &err)
{ {

@ -1 +1 @@
Subproject commit 528406f0567e6ac48356eeb2b4b7b9ba06e8593e Subproject commit 2b6c9d62e6414fe90b456db829723814a534545e