mirror of
https://github.com/drogonframework/drogon.git
synced 2025-07-04 00:00:46 -04:00
Compare commits
2 Commits
debdc27c4d
...
b52ab5f4f1
Author | SHA1 | Date | |
---|---|---|---|
|
b52ab5f4f1 | ||
|
2beaf0a901 |
@ -718,6 +718,7 @@ install(FILES ${NOSQL_HEADERS} DESTINATION ${INSTALL_INCLUDE_DIR}/drogon/nosql)
|
|||||||
|
|
||||||
set(DROGON_UTIL_HEADERS
|
set(DROGON_UTIL_HEADERS
|
||||||
lib/inc/drogon/utils/coroutine.h
|
lib/inc/drogon/utils/coroutine.h
|
||||||
|
lib/inc/drogon/utils/Http11ClientPool.h
|
||||||
lib/inc/drogon/utils/FunctionTraits.h
|
lib/inc/drogon/utils/FunctionTraits.h
|
||||||
lib/inc/drogon/utils/HttpConstraint.h
|
lib/inc/drogon/utils/HttpConstraint.h
|
||||||
lib/inc/drogon/utils/OStringStream.h
|
lib/inc/drogon/utils/OStringStream.h
|
||||||
|
@ -1,43 +1,112 @@
|
|||||||
#include <drogon/drogon.h>
|
#include <drogon/drogon.h>
|
||||||
|
|
||||||
#include <future>
|
#include <chrono>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <memory>
|
||||||
|
#include <thread>
|
||||||
|
#include <drogon/HttpTypes.h>
|
||||||
|
#include <trantor/utils/Logger.h>
|
||||||
|
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#endif
|
#endif
|
||||||
|
#include <drogon/utils/Http11ClientPool.h>
|
||||||
using namespace drogon;
|
using namespace drogon;
|
||||||
|
|
||||||
int nth_resp = 0;
|
int nth_resp = 0;
|
||||||
|
|
||||||
int main()
|
int main()
|
||||||
{
|
{
|
||||||
|
auto func = [](int fd) {
|
||||||
|
std::cout << "setSockOptCallback:" << fd << std::endl;
|
||||||
|
#ifdef __linux__
|
||||||
|
int optval = 10;
|
||||||
|
::setsockopt(fd,
|
||||||
|
SOL_TCP,
|
||||||
|
TCP_KEEPCNT,
|
||||||
|
&optval,
|
||||||
|
static_cast<socklen_t>(sizeof optval));
|
||||||
|
::setsockopt(fd,
|
||||||
|
SOL_TCP,
|
||||||
|
TCP_KEEPIDLE,
|
||||||
|
&optval,
|
||||||
|
static_cast<socklen_t>(sizeof optval));
|
||||||
|
::setsockopt(fd,
|
||||||
|
SOL_TCP,
|
||||||
|
TCP_KEEPINTVL,
|
||||||
|
&optval,
|
||||||
|
static_cast<socklen_t>(sizeof optval));
|
||||||
|
#endif
|
||||||
|
};
|
||||||
trantor::Logger::setLogLevel(trantor::Logger::kTrace);
|
trantor::Logger::setLogLevel(trantor::Logger::kTrace);
|
||||||
|
#ifdef __cpp_impl_coroutine
|
||||||
|
Http11ClientPoolConfig cfg{
|
||||||
|
.hostString = "http://www.baidu.com",
|
||||||
|
.useOldTLS = false,
|
||||||
|
.validateCert = false,
|
||||||
|
.size = 10,
|
||||||
|
.setCallback =
|
||||||
|
[func](auto &client) {
|
||||||
|
LOG_INFO << "setCallback";
|
||||||
|
client->setSockOptCallback(func);
|
||||||
|
},
|
||||||
|
.numOfThreads = 4,
|
||||||
|
.keepaliveRequests = 1000,
|
||||||
|
.idleTimeout = std::chrono::seconds(10),
|
||||||
|
.maxLifeTime = std::chrono::seconds(300),
|
||||||
|
.checkInterval = std::chrono::seconds(10),
|
||||||
|
};
|
||||||
|
auto pool = std::make_unique<Http11ClientPool>(cfg);
|
||||||
|
auto req = HttpRequest::newHttpRequest();
|
||||||
|
req->setMethod(drogon::Get);
|
||||||
|
req->setPath("/s");
|
||||||
|
req->setParameter("wd", "wx");
|
||||||
|
req->setParameter("oq", "wx");
|
||||||
|
|
||||||
|
for (int i = 0; i < 1; i++)
|
||||||
|
{
|
||||||
|
[](auto req, auto &pool) -> drogon::AsyncTask {
|
||||||
|
{
|
||||||
|
auto [result, resp] = co_await pool->sendRequestCoro(req, 10);
|
||||||
|
if (result == ReqResult::Ok)
|
||||||
|
LOG_INFO << "1:" << resp->getStatusCode();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
auto [result, resp] = co_await pool->sendRequestCoro(req, 10);
|
||||||
|
if (result == ReqResult::Ok)
|
||||||
|
LOG_INFO << "2:" << resp->getStatusCode();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
auto [result, resp] = co_await pool->sendRequestCoro(req, 10);
|
||||||
|
if (result == ReqResult::Ok)
|
||||||
|
LOG_INFO << "3:" << resp->getStatusCode();
|
||||||
|
}
|
||||||
|
co_return;
|
||||||
|
}(req, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
pool->sendRequest(
|
||||||
|
req,
|
||||||
|
[](ReqResult result, const HttpResponsePtr &response) {
|
||||||
|
if (result != ReqResult::Ok)
|
||||||
|
{
|
||||||
|
LOG_ERROR
|
||||||
|
<< "error while sending request to server! result: "
|
||||||
|
<< result;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG_INFO << "callback:" << response->getStatusCode();
|
||||||
|
},
|
||||||
|
10);
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(30));
|
||||||
|
#else
|
||||||
{
|
{
|
||||||
auto client = HttpClient::newHttpClient("http://www.baidu.com");
|
auto client = HttpClient::newHttpClient("http://www.baidu.com");
|
||||||
client->setSockOptCallback([](int fd) {
|
client->setSockOptCallback(func);
|
||||||
std::cout << "setSockOptCallback:" << fd << std::endl;
|
|
||||||
#ifdef __linux__
|
|
||||||
int optval = 10;
|
|
||||||
::setsockopt(fd,
|
|
||||||
SOL_TCP,
|
|
||||||
TCP_KEEPCNT,
|
|
||||||
&optval,
|
|
||||||
static_cast<socklen_t>(sizeof optval));
|
|
||||||
::setsockopt(fd,
|
|
||||||
SOL_TCP,
|
|
||||||
TCP_KEEPIDLE,
|
|
||||||
&optval,
|
|
||||||
static_cast<socklen_t>(sizeof optval));
|
|
||||||
::setsockopt(fd,
|
|
||||||
SOL_TCP,
|
|
||||||
TCP_KEEPINTVL,
|
|
||||||
&optval,
|
|
||||||
static_cast<socklen_t>(sizeof optval));
|
|
||||||
#endif
|
|
||||||
});
|
|
||||||
|
|
||||||
auto req = HttpRequest::newHttpRequest();
|
auto req = HttpRequest::newHttpRequest();
|
||||||
req->setMethod(drogon::Get);
|
req->setMethod(drogon::Get);
|
||||||
@ -77,4 +146,5 @@ int main()
|
|||||||
}
|
}
|
||||||
|
|
||||||
app().run();
|
app().run();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
312
lib/inc/drogon/utils/Http11ClientPool.h
Normal file
312
lib/inc/drogon/utils/Http11ClientPool.h
Normal file
@ -0,0 +1,312 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* @file Http11ClientPool.h
|
||||||
|
* @author fantasy-peak
|
||||||
|
*
|
||||||
|
* Copyright 2024, fantasy-peak. All rights reserved.
|
||||||
|
* https://github.com/an-tao/drogon
|
||||||
|
* Use of this source code is governed by a MIT license
|
||||||
|
* that can be found in the License file.
|
||||||
|
*
|
||||||
|
* Drogon
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <functional>
|
||||||
|
#include <future>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <optional>
|
||||||
|
#include <queue>
|
||||||
|
#include <string>
|
||||||
|
#include <tuple>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include <trantor/utils/Logger.h>
|
||||||
|
#include <trantor/net/EventLoopThreadPool.h>
|
||||||
|
#ifdef __cpp_impl_coroutine
|
||||||
|
#include <drogon/utils/coroutine.h>
|
||||||
|
#endif
|
||||||
|
#include <drogon/drogon.h>
|
||||||
|
|
||||||
|
namespace drogon
|
||||||
|
{
|
||||||
|
|
||||||
|
struct Http11ClientPoolConfig
|
||||||
|
{
|
||||||
|
std::string hostString;
|
||||||
|
bool useOldTLS{false};
|
||||||
|
bool validateCert{false};
|
||||||
|
std::size_t size{100};
|
||||||
|
std::function<void(HttpClientPtr &)> setCallback;
|
||||||
|
std::size_t numOfThreads{std::thread::hardware_concurrency()};
|
||||||
|
std::optional<std::size_t> keepaliveRequests;
|
||||||
|
std::optional<std::chrono::seconds> idleTimeout;
|
||||||
|
std::optional<std::chrono::seconds> maxLifeTime;
|
||||||
|
std::optional<std::chrono::seconds> checkInterval;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Http11ClientPool final
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Http11ClientPool(
|
||||||
|
Http11ClientPoolConfig cfg,
|
||||||
|
std::shared_ptr<trantor::EventLoopThreadPool> loopPool = nullptr)
|
||||||
|
: cfg_(std::move(cfg)), loopPool_(std::move(loopPool))
|
||||||
|
{
|
||||||
|
if (loopPool_ == nullptr)
|
||||||
|
{
|
||||||
|
loopPool_ = std::make_shared<trantor::EventLoopThreadPool>(
|
||||||
|
cfg_.numOfThreads);
|
||||||
|
loopPool_->start();
|
||||||
|
isSelfThreadPool_ = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
isSelfThreadPool_ = false;
|
||||||
|
}
|
||||||
|
loopPtr_ = loopPool_->getNextLoop();
|
||||||
|
|
||||||
|
for (std::size_t i = 0; i < cfg_.size; i++)
|
||||||
|
{
|
||||||
|
auto loopPtr = loopPool_->getNextLoop();
|
||||||
|
auto func = [this, loopPtr]() mutable {
|
||||||
|
auto client = HttpClient::newHttpClient(cfg_.hostString,
|
||||||
|
loopPtr,
|
||||||
|
cfg_.useOldTLS,
|
||||||
|
cfg_.validateCert);
|
||||||
|
if (cfg_.setCallback)
|
||||||
|
cfg_.setCallback(client);
|
||||||
|
return client;
|
||||||
|
};
|
||||||
|
httpClients_.emplace(std::make_shared<Connection>(func, cfg_));
|
||||||
|
}
|
||||||
|
LOG_DEBUG << "httpClients_ size:" << httpClients_.size();
|
||||||
|
|
||||||
|
if (cfg_.idleTimeout.has_value() && cfg_.checkInterval.has_value())
|
||||||
|
{
|
||||||
|
timerId_ = loopPtr_->runEvery(cfg_.checkInterval.value(), [this] {
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
if (httpClients_.empty())
|
||||||
|
return;
|
||||||
|
std::queue<std::shared_ptr<Connection>> clients;
|
||||||
|
while (!httpClients_.empty())
|
||||||
|
{
|
||||||
|
auto connPtr = std::move(httpClients_.front());
|
||||||
|
httpClients_.pop();
|
||||||
|
if (connPtr->reachIdleTimeout())
|
||||||
|
{
|
||||||
|
// close tcp connection
|
||||||
|
connPtr->resetClientPtr();
|
||||||
|
}
|
||||||
|
clients.emplace(std::move(connPtr));
|
||||||
|
}
|
||||||
|
httpClients_ = std::move(clients);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
~Http11ClientPool()
|
||||||
|
{
|
||||||
|
if (timerId_)
|
||||||
|
{
|
||||||
|
std::promise<void> done;
|
||||||
|
loopPtr_->runInLoop([&] {
|
||||||
|
loopPtr_->invalidateTimer(timerId_.value());
|
||||||
|
done.set_value();
|
||||||
|
});
|
||||||
|
done.get_future().wait();
|
||||||
|
}
|
||||||
|
if (isSelfThreadPool_)
|
||||||
|
{
|
||||||
|
for (auto &ptr : loopPool_->getLoops())
|
||||||
|
ptr->runInLoop([=] { ptr->quit(); });
|
||||||
|
loopPool_->wait();
|
||||||
|
loopPool_.reset();
|
||||||
|
}
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
std::queue<std::shared_ptr<Connection>> tmp;
|
||||||
|
httpClients_.swap(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
Http11ClientPool(const Http11ClientPool &) = delete;
|
||||||
|
Http11ClientPool &operator=(const Http11ClientPool &) = delete;
|
||||||
|
Http11ClientPool(Http11ClientPool &&) = delete;
|
||||||
|
Http11ClientPool &operator=(Http11ClientPool &&) = delete;
|
||||||
|
|
||||||
|
void sendRequest(const HttpRequestPtr &req,
|
||||||
|
std::function<void(ReqResult, const HttpResponsePtr &)> cb,
|
||||||
|
double timeout = 0)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
if (httpClients_.empty())
|
||||||
|
{
|
||||||
|
httpRequest_.emplace(req, std::move(cb));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto connPtr = std::move(httpClients_.front());
|
||||||
|
httpClients_.pop();
|
||||||
|
lock.unlock();
|
||||||
|
send(std::move(connPtr), req, std::move(cb), timeout);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __cpp_impl_coroutine
|
||||||
|
|
||||||
|
auto sendRequestCoro(HttpRequestPtr req, double timeout = 0)
|
||||||
|
{
|
||||||
|
struct Awaiter
|
||||||
|
: public CallbackAwaiter<std::tuple<ReqResult, HttpResponsePtr>>
|
||||||
|
{
|
||||||
|
Awaiter(Http11ClientPool *pool, HttpRequestPtr req, double timeout)
|
||||||
|
: pool_(pool), req_(std::move(req)), timeout_(timeout)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void await_suspend(std::coroutine_handle<> handle)
|
||||||
|
{
|
||||||
|
pool_->sendRequest(
|
||||||
|
req_,
|
||||||
|
[this, handle](ReqResult result,
|
||||||
|
const HttpResponsePtr &ptr) {
|
||||||
|
setValue(std::make_tuple(result, ptr));
|
||||||
|
handle.resume();
|
||||||
|
},
|
||||||
|
timeout_);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Http11ClientPool *pool_;
|
||||||
|
HttpRequestPtr req_;
|
||||||
|
double timeout_;
|
||||||
|
};
|
||||||
|
|
||||||
|
return Awaiter{this, std::move(req), timeout};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Connection
|
||||||
|
{
|
||||||
|
Connection(std::function<HttpClientPtr()> cb,
|
||||||
|
const Http11ClientPoolConfig &cfg)
|
||||||
|
: createHttpClientFunc_(std::move(cb)), cfg_(cfg)
|
||||||
|
{
|
||||||
|
init();
|
||||||
|
}
|
||||||
|
|
||||||
|
void init()
|
||||||
|
{
|
||||||
|
clientPtr_ = createHttpClientFunc_();
|
||||||
|
auto now = std::chrono::system_clock::now();
|
||||||
|
timePoint_ = now;
|
||||||
|
startTimePoint_ = now;
|
||||||
|
counter_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void send(const HttpRequestPtr &req,
|
||||||
|
std::function<void(ReqResult, const HttpResponsePtr &)> cb,
|
||||||
|
double timeout)
|
||||||
|
{
|
||||||
|
if (isInvalid())
|
||||||
|
{
|
||||||
|
init();
|
||||||
|
}
|
||||||
|
assert(clientPtr_ != nullptr);
|
||||||
|
clientPtr_->sendRequest(req, std::move(cb), timeout);
|
||||||
|
++counter_;
|
||||||
|
auto now = std::chrono::system_clock::now();
|
||||||
|
timePoint_ = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isInvalid()
|
||||||
|
{
|
||||||
|
auto now = std::chrono::system_clock::now();
|
||||||
|
auto idleDut = now - timePoint_;
|
||||||
|
auto dut = now - startTimePoint_;
|
||||||
|
if ((clientPtr_ == nullptr) ||
|
||||||
|
(cfg_.keepaliveRequests.has_value() &&
|
||||||
|
counter_ >= cfg_.keepaliveRequests.value()) ||
|
||||||
|
(cfg_.idleTimeout.has_value() &&
|
||||||
|
idleDut >= cfg_.idleTimeout.value()) ||
|
||||||
|
(cfg_.maxLifeTime.has_value() &&
|
||||||
|
dut >= cfg_.maxLifeTime.value()))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool reachIdleTimeout()
|
||||||
|
{
|
||||||
|
auto now = std::chrono::system_clock::now();
|
||||||
|
auto idleDut = now - timePoint_;
|
||||||
|
if (idleDut >= cfg_.idleTimeout.value())
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetClientPtr()
|
||||||
|
{
|
||||||
|
clientPtr_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
Http11ClientPoolConfig cfg_;
|
||||||
|
std::function<HttpClientPtr()> createHttpClientFunc_;
|
||||||
|
HttpClientPtr clientPtr_;
|
||||||
|
std::chrono::time_point<std::chrono::system_clock> timePoint_;
|
||||||
|
std::chrono::time_point<std::chrono::system_clock> startTimePoint_;
|
||||||
|
std::size_t counter_{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
void send(std::shared_ptr<Connection> connPtr,
|
||||||
|
const HttpRequestPtr &req,
|
||||||
|
std::function<void(ReqResult, const HttpResponsePtr &)> cb,
|
||||||
|
double timeout)
|
||||||
|
{
|
||||||
|
connPtr->send(
|
||||||
|
req,
|
||||||
|
[connPtr, this, cb = std::move(cb), timeout](
|
||||||
|
ReqResult result, const HttpResponsePtr &ptr) mutable {
|
||||||
|
cb(result, ptr);
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
if (httpRequest_.empty())
|
||||||
|
{
|
||||||
|
httpClients_.emplace(std::move(connPtr));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto op = std::move(httpRequest_.front());
|
||||||
|
httpRequest_.pop();
|
||||||
|
lock.unlock();
|
||||||
|
auto &[req, cb] = op;
|
||||||
|
send(std::move(connPtr), req, std::move(cb), timeout);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
Http11ClientPoolConfig cfg_;
|
||||||
|
std::shared_ptr<trantor::EventLoopThreadPool> loopPool_;
|
||||||
|
bool isSelfThreadPool_;
|
||||||
|
trantor::EventLoop *loopPtr_;
|
||||||
|
std::mutex mutex_;
|
||||||
|
std::queue<std::shared_ptr<Connection>> httpClients_;
|
||||||
|
std::queue<
|
||||||
|
std::tuple<HttpRequestPtr,
|
||||||
|
std::function<void(ReqResult, const HttpResponsePtr &)>>>
|
||||||
|
httpRequest_;
|
||||||
|
std::optional<trantor::TimerId> timerId_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace drogon
|
Loading…
x
Reference in New Issue
Block a user