Add the setTimeout() method to the DbClient class (#823)

This commit is contained in:
An Tao 2021-04-29 10:13:34 +08:00 committed by GitHub
parent a33bf2bf34
commit 685aaaa3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 821 additions and 124 deletions

View File

@ -199,6 +199,7 @@ set(DROGON_SOURCES
lib/src/SecureSSLRedirector.cc
lib/src/SessionManager.cc
lib/src/StaticFileRouter.cc
lib/src/TaskTimeoutFlag.cc
lib/src/Utilities.cc
lib/src/WebSocketClientImpl.cc
lib/src/WebSocketConnectionImpl.cc

View File

@ -54,7 +54,10 @@
//"client_encoding": "",
//number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of
//connections per IO thread, otherwise it is the total number of all connections.
"number_of_connections": 1
"number_of_connections": 1,
//timeout: -1.0 by default, in seconds, the timeout for executing a SQL query.
//zero or negative value means no timeout.
"timeout": -1.0
}
],
"redis_clients": [
@ -287,4 +290,4 @@
}
]
}
}
}

View File

@ -54,7 +54,10 @@
//"client_encoding": "",
//number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of
//connections per IO thread, otherwise it is the total number of all connections.
"number_of_connections": 1
"number_of_connections": 1,
//timeout: -1.0 by default, in seconds, the timeout for executing a SQL query.
//zero or negative value means no timeout.
"timeout": -1.0
}
],
"redis_clients": [
@ -278,4 +281,4 @@
],
//custom_config: custom configuration for users. This object can be get by the app().getCustomConfig() method.
"custom_config": {}
}
}

View File

@ -703,7 +703,7 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
@endcode
*/
inline HttpAppFramework &enableSession(
const std::chrono::duration<long double> &timeout)
const std::chrono::duration<double> &timeout)
{
return enableSession((size_t)timeout.count());
}
@ -959,7 +959,7 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
@endcode
*/
inline HttpAppFramework &setIdleConnectionTimeout(
const std::chrono::duration<long double> &timeout)
const std::chrono::duration<double> &timeout)
{
return setIdleConnectionTimeout((size_t)timeout.count());
}
@ -1222,6 +1222,9 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
* @param filename The file name of sqlite3 database file.
* @param name The client name.
* @param isFast Indicates if the client is a fast database client.
* @param characterSet The character set of the database server.
* @param timeout The timeout in seconds for executing SQL queries. zero or
* negative value means no timeout.
*
* @note
* This operation can be performed by an option in the configuration file.
@ -1237,7 +1240,8 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
const std::string &filename = "",
const std::string &name = "default",
const bool isFast = false,
const std::string &characterSet = "") = 0;
const std::string &characterSet = "",
double timeout = -1.0) = 0;
/// Create a redis client
/**

View File

@ -207,7 +207,7 @@ class WebSocketConnection
*/
virtual void setPingMessage(
const std::string &message,
const std::chrono::duration<long double> &interval) = 0;
const std::chrono::duration<double> &interval) = 0;
/**
* @brief Disable sending ping messages to the peer.

View File

@ -576,7 +576,7 @@ namespace internal
struct TimerAwaiter : CallbackAwaiter<void>
{
TimerAwaiter(trantor::EventLoop *loop,
const std::chrono::duration<long double> &delay)
const std::chrono::duration<double> &delay)
: loop_(loop), delay_(delay.count())
{
}
@ -597,7 +597,7 @@ struct TimerAwaiter : CallbackAwaiter<void>
inline internal::TimerAwaiter sleepCoro(
trantor::EventLoop *loop,
const std::chrono::duration<long double> &delay) noexcept
const std::chrono::duration<double> &delay) noexcept
{
assert(loop);
return internal::TimerAwaiter(loop, delay);

View File

@ -513,6 +513,7 @@ static void loadDbClients(const Json::Value &dbClients)
{
characterSet = client.get("client_encoding", "").asString();
}
auto timeout = client.get("timeout", -1.0).asDouble();
drogon::app().createDbClient(type,
host,
(unsigned short)port,
@ -523,7 +524,8 @@ static void loadDbClients(const Json::Value &dbClients)
filename,
name,
isFast,
characterSet);
characterSet,
timeout);
}
}

View File

@ -1,7 +1,7 @@
/**
*
* DbClientManager.h
* An Tao
* @file DbClientManager.h
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
@ -52,7 +52,8 @@ class DbClientManager : public trantor::NonCopyable
const std::string &filename,
const std::string &name,
const bool isFast,
const std::string &characterSet);
const std::string &characterSet,
double timeout);
bool areAllDbClientsAvailable() const noexcept;
private:
@ -64,6 +65,7 @@ class DbClientManager : public trantor::NonCopyable
ClientType dbType_;
bool isFast_;
size_t connectionNumber_;
double timeout_;
};
std::vector<DbInfo> dbInfos_;
std::map<std::string, IOThreadStorage<orm::DbClientPtr>> dbFastClientsMap_;

View File

@ -37,7 +37,8 @@ void DbClientManager::createDbClient(const std::string & /*dbType*/,
const std::string & /*filename*/,
const std::string & /*name*/,
const bool /*isFast*/,
const std::string & /*characterSet*/)
const std::string & /*characterSet*/,
double /*timeout*/)
{
LOG_FATAL << "No database is supported by drogon, please install the "
"database development library first.";

View File

@ -958,7 +958,8 @@ HttpAppFramework &HttpAppFrameworkImpl::createDbClient(
const std::string &filename,
const std::string &name,
const bool isFast,
const std::string &characterSet)
const std::string &characterSet,
double timeout)
{
assert(!running_);
dbClientManagerPtr_->createDbClient(dbType,
@ -971,7 +972,8 @@ HttpAppFramework &HttpAppFrameworkImpl::createDbClient(
filename,
name,
isFast,
characterSet);
characterSet,
timeout);
return *this;
}

View File

@ -452,7 +452,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework
const std::string &filename,
const std::string &name,
bool isFast,
const std::string &characterSet) override;
const std::string &characterSet,
double timeout) override;
HttpAppFramework &createRedisClient(const std::string &ip,
unsigned short port,
const std::string &name,

View File

@ -0,0 +1,39 @@
/**
*
* @file TaskTimeoutFlag.cc
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#include "TaskTimeoutFlag.h"
using namespace drogon;
TaskTimeoutFlag::TaskTimeoutFlag(trantor::EventLoop *loop,
const std::chrono::duration<double> &timeout,
std::function<void()> timeoutCallback)
: loop_(loop), timeout_(timeout), timeoutFunc_(timeoutCallback)
{
}
void TaskTimeoutFlag::runTimer()
{
std::weak_ptr<TaskTimeoutFlag> weakPtr = shared_from_this();
loop_->runAfter(timeout_, [weakPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
if (thisPtr->done())
return;
thisPtr->timeoutFunc_();
});
}
bool TaskTimeoutFlag::done()
{
return isDone_.exchange(true);
}

39
lib/src/TaskTimeoutFlag.h Normal file
View File

@ -0,0 +1,39 @@
/**
*
* @file TaskTimeoutFlag.h
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#include <trantor/utils/NonCopyable.h>
#include <trantor/net/EventLoop.h>
#include <chrono>
#include <functional>
#include <atomic>
#include <memory>
namespace drogon
{
class TaskTimeoutFlag : public trantor::NonCopyable,
public std::enable_shared_from_this<TaskTimeoutFlag>
{
public:
TaskTimeoutFlag(trantor::EventLoop *loop,
const std::chrono::duration<double> &timeout,
std::function<void()> timeoutCallback);
bool done();
void runTimer();
private:
std::atomic<bool> isDone_{false};
trantor::EventLoop *loop_;
std::chrono::duration<double> timeout_;
std::function<void()> timeoutFunc_;
};
} // namespace drogon

View File

@ -173,7 +173,7 @@ void WebSocketConnectionImpl::WebSocketConnectionImpl::forceClose()
void WebSocketConnectionImpl::setPingMessage(
const std::string &message,
const std::chrono::duration<long double> &interval)
const std::chrono::duration<double> &interval)
{
auto loop = tcpConnectionPtr_->getLoop();
if (loop->isInLoopThread())
@ -397,7 +397,7 @@ void WebSocketConnectionImpl::disablePingInLoop()
void WebSocketConnectionImpl::setPingMessageInLoop(
std::string &&message,
const std::chrono::duration<long double> &interval)
const std::chrono::duration<double> &interval)
{
std::weak_ptr<WebSocketConnectionImpl> weakPtr = shared_from_this();
disablePingInLoop();

View File

@ -72,9 +72,8 @@ class WebSocketConnectionImpl final
const std::string &reason = "") override; // close write
void forceClose() override; // close
void setPingMessage(
const std::string &message,
const std::chrono::duration<long double> &interval) override;
void setPingMessage(const std::string &message,
const std::chrono::duration<double> &interval) override;
void disablePing() override;
@ -120,9 +119,8 @@ class WebSocketConnectionImpl final
[](const WebSocketConnectionImplPtr &) {};
void sendWsData(const char *msg, uint64_t len, unsigned char opcode);
void disablePingInLoop();
void setPingMessageInLoop(
std::string &&message,
const std::chrono::duration<long double> &interval);
void setPingMessageInLoop(std::string &&message,
const std::chrono::duration<double> &interval);
};
} // namespace drogon

View File

@ -238,7 +238,8 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable
* transaction object to set the callback.
*/
virtual std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback = nullptr) = 0;
const std::function<void(bool)> &commitCallback =
std::function<void(bool)>()) noexcept(false) = 0;
/// Create a transaction object in asynchronous mode.
virtual void newTransactionAsync(
@ -269,6 +270,18 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable
return connectionInfo_;
}
/**
* @brief Set the Timeout value of execution of a SQL.
*
* @param timeout in seconds, if the SQL result is not returned from the
* server within the timeout, a TimeoutError exception with "SQL execution
* timeout" string is generated and returned to the caller.
* @note set the timeout value to zero or negative for no limit on time. The
* default value is -1.0, this means there is no time limit if this method
* is not called.
*/
virtual void setTimeout(double timeout) = 0;
private:
friend internal::SqlBinder;
virtual void execSql(
@ -304,8 +317,8 @@ inline void internal::TrasactionAwaiter::await_suspend(
client_->newTransactionAsync(
[this, handle](const std::shared_ptr<Transaction> &transaction) {
if (transaction == nullptr)
setException(std::make_exception_ptr(
Failure("Failed to create transaction")));
setException(std::make_exception_ptr(TimeoutError(
"Timeout, no connection available for transaction")));
else
setValue(transaction);
handle.resume();

View File

@ -209,6 +209,18 @@ class InternalError : public DrogonDbException, public std::logic_error
DROGON_EXPORT explicit InternalError(const std::string &);
};
/// Timeout error in when executing the SQL statement.
class TimeoutError : public DrogonDbException, public std::logic_error
{
virtual const std::exception &base() const noexcept override
{
return *this;
}
public:
DROGON_EXPORT explicit TimeoutError(const std::string &);
};
/// Error in usage of drogon orm library, similar to std::logic_error
class UsageError : public DrogonDbException, public std::logic_error
{

View File

@ -14,6 +14,7 @@
#include "DbClientImpl.h"
#include "DbConnection.h"
#include "../../lib/src/TaskTimeoutFlag.h"
#include <drogon/config.h>
#include <drogon/utils/string_view.h>
#if USE_POSTGRESQL
@ -149,6 +150,18 @@ void DbClientImpl::execSql(
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
if (timeout_ > 0.0)
{
execSqlWithTimeout(sql,
sqlLength,
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
DbConnectionPtr conn;
bool busy = false;
{
@ -223,7 +236,47 @@ void DbClientImpl::newTransactionAsync(
}
else
{
transCallbacks_.push(callback);
auto callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<Transaction> &)>>(
callback);
if (timeout_ > 0.0)
{
auto newCallbackPtr =
std::make_shared<std::weak_ptr<std::function<void(
const std::shared_ptr<Transaction> &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loops_.getNextLoop(),
std::chrono::duration<double>(timeout_),
[newCallbackPtr, callbackPtr, this]() {
auto cbPtr = (*newCallbackPtr).lock();
if (cbPtr)
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
for (auto iter = transCallbacks_.begin();
iter != transCallbacks_.end();
++iter)
{
if (cbPtr == *iter)
{
transCallbacks_.erase(iter);
break;
}
}
}
(*callbackPtr)(nullptr);
});
callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<Transaction> &)>>(
[callbackPtr, timeoutFlagPtr](
const std::shared_ptr<Transaction> &trans) {
if (timeoutFlagPtr->done())
return;
(*callbackPtr)(trans);
});
(*newCallbackPtr) = callbackPtr;
timeoutFlagPtr->runTimer();
}
transCallbacks_.push_back(callbackPtr);
}
}
if (conn)
@ -279,11 +332,15 @@ void DbClientImpl::makeTrans(
});
}));
trans->doBegin();
if (timeout_ > 0.0)
{
trans->setTimeout(timeout_);
}
conn->loop()->queueInLoop(
[callback = std::move(callback), trans]() { callback(trans); });
}
std::shared_ptr<Transaction> DbClientImpl::newTransaction(
const std::function<void(bool)> &commitCallback)
const std::function<void(bool)> &commitCallback) noexcept(false)
{
std::promise<std::shared_ptr<Transaction>> pro;
auto f = pro.get_future();
@ -291,6 +348,10 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(
pro.set_value(trans);
});
auto trans = f.get();
if (!trans)
{
throw TimeoutError("Timeout, no connection available for transaction");
}
trans->setCommitCallback(commitCallback);
return trans;
}
@ -303,8 +364,8 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
std::lock_guard<std::mutex> guard(connectionsMutex_);
if (!transCallbacks_.empty())
{
transCallback = std::move(transCallbacks_.front());
transCallbacks_.pop();
transCallback = std::move(*(transCallbacks_.front()));
transCallbacks_.pop_front();
}
else if (!sqlCmdBuffer_.empty())
{
@ -426,4 +487,114 @@ bool DbClientImpl::hasAvailableConnections() const noexcept
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
return (!readyConnections_.empty()) || (!busyConnections_.empty());
}
void DbClientImpl::execSqlWithTimeout(
const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&ecb)
{
DbConnectionPtr conn;
assert(timeout_ > 0.0);
auto cmd = std::make_shared<std::weak_ptr<SqlCmd>>();
bool busy = false;
auto ecpPtr =
std::make_shared<std::function<void(const std::exception_ptr &)>>(
std::move(ecb));
auto timeoutFlagPtr = std::make_shared<drogon::TaskTimeoutFlag>(
loops_.getNextLoop(),
std::chrono::duration<double>(timeout_),
[cmd, ecpPtr, thisPtr = shared_from_this()]() {
auto cbPtr = (*cmd).lock();
if (cbPtr)
{
std::lock_guard<std::mutex> lock(thisPtr->connectionsMutex_);
for (auto iter = thisPtr->sqlCmdBuffer_.begin();
iter != thisPtr->sqlCmdBuffer_.end();
++iter)
{
if (*iter == cbPtr)
{
thisPtr->sqlCmdBuffer_.erase(iter);
break;
}
}
}
(*ecpPtr)(
std::make_exception_ptr(TimeoutError("SQL execution timeout")));
});
auto resultCallback = [rcb = std::move(rcb),
timeoutFlagPtr](const Result &result) {
if (timeoutFlagPtr->done())
return;
rcb(result);
};
auto exceptionCallback = [ecpPtr,
timeoutFlagPtr](const std::exception_ptr &err) {
if (timeoutFlagPtr->done())
return;
(*ecpPtr)(err);
};
{
std::lock_guard<std::mutex> guard(connectionsMutex_);
if (readyConnections_.size() == 0)
{
if (sqlCmdBuffer_.size() > 200000)
{
// too many queries in buffer;
busy = true;
}
else
{
// LOG_TRACE << "Push query to buffer";
auto command =
std::make_shared<SqlCmd>(string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(resultCallback),
std::move(exceptionCallback));
sqlCmdBuffer_.emplace_back(command);
*cmd = command;
}
}
else
{
auto iter = readyConnections_.begin();
busyConnections_.insert(*iter);
conn = *iter;
readyConnections_.erase(iter);
}
}
if (conn)
{
execSql(conn,
string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(resultCallback),
std::move(exceptionCallback));
timeoutFlagPtr->runTimer();
return;
}
if (busy)
{
exceptionCallback(
std::make_exception_ptr(Failure("Too many queries in buffer")));
return;
}
timeoutFlagPtr->runTimer();
}

View File

@ -1,7 +1,7 @@
/**
*
* DbClientImpl.h
* An Tao
* @file DbClientImpl.h
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
@ -35,28 +35,33 @@ class DbClientImpl : public DbClient,
DbClientImpl(const std::string &connInfo,
const size_t connNum,
ClientType type);
virtual ~DbClientImpl() noexcept;
virtual void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback = nullptr) override;
virtual void newTransactionAsync(
~DbClientImpl() noexcept override;
void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override;
std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback =
std::function<void(bool)>()) noexcept(false) override;
void newTransactionAsync(
const std::function<void(const std::shared_ptr<Transaction> &)>
&callback) override;
virtual bool hasAvailableConnections() const noexcept override;
bool hasAvailableConnections() const noexcept override;
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
private:
size_t numberOfConnections_;
trantor::EventLoopThreadPool loops_;
std::shared_ptr<SharedMutex> sharedMutexPtr_;
double timeout_{-1.0};
void execSql(
const DbConnectionPtr &conn,
string_view &&sql,
@ -78,12 +83,22 @@ class DbClientImpl : public DbClient,
std::unordered_set<DbConnectionPtr> readyConnections_;
std::unordered_set<DbConnectionPtr> busyConnections_;
std::queue<std::function<void(const std::shared_ptr<Transaction> &)>>
std::list<std::shared_ptr<
std::function<void(const std::shared_ptr<Transaction> &)>>>
transCallbacks_;
std::deque<std::shared_ptr<SqlCmd>> sqlCmdBuffer_;
void handleNewTask(const DbConnectionPtr &connPtr);
void execSqlWithTimeout(
const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
};
} // namespace orm

View File

@ -15,6 +15,7 @@
#include "DbClientLockFree.h"
#include "DbConnection.h"
#include "TransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
#include <drogon/config.h>
#if USE_POSTGRESQL
#include "postgresql_impl/PgConnection.h"
@ -88,6 +89,18 @@ void DbClientLockFree::execSql(
assert(paraNum == format.size());
assert(rcb);
loop_->assertInLoopThread();
if (timeout_ > 0.0)
{
execSqlWithTimeout(sql,
sqlLength,
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
if (!connections_.empty() && sqlCmdBuffer_.empty() &&
transCallbacks_.empty())
{
@ -210,7 +223,7 @@ void DbClientLockFree::execSql(
}
std::shared_ptr<Transaction> DbClientLockFree::newTransaction(
const std::function<void(bool)> &)
const std::function<void(bool)> &) noexcept(false)
{
// Don't support transaction;
LOG_ERROR
@ -234,7 +247,45 @@ void DbClientLockFree::newTransactionAsync(
return;
}
}
transCallbacks_.push(callback);
auto callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<Transaction> &)>>(callback);
if (timeout_ > 0.0)
{
auto newCallbackPtr = std::make_shared<std::weak_ptr<
std::function<void(const std::shared_ptr<Transaction> &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loop_,
std::chrono::duration<double>(timeout_),
[callbackPtr, this, newCallbackPtr]() {
auto cbPtr = (*newCallbackPtr).lock();
if (cbPtr)
{
for (auto iter = transCallbacks_.begin();
iter != transCallbacks_.end();
++iter)
{
if (cbPtr == *iter)
{
transCallbacks_.erase(iter);
break;
}
}
}
(*callbackPtr)(nullptr);
});
callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<Transaction> &)>>(
[callbackPtr,
timeoutFlagPtr](const std::shared_ptr<Transaction> &trans) {
if (timeoutFlagPtr->done())
return;
(*callbackPtr)(trans);
});
*newCallbackPtr = callbackPtr;
timeoutFlagPtr->runTimer();
}
transCallbacks_.push_back(callbackPtr);
}
void DbClientLockFree::makeTrans(
@ -255,8 +306,8 @@ void DbClientLockFree::makeTrans(
if (!thisPtr->transCallbacks_.empty())
{
auto callback = std::move(thisPtr->transCallbacks_.front());
thisPtr->transCallbacks_.pop();
thisPtr->makeTrans(conn, std::move(callback));
thisPtr->transCallbacks_.pop_front();
thisPtr->makeTrans(conn, std::move(*callback));
return;
}
@ -287,6 +338,10 @@ void DbClientLockFree::makeTrans(
}));
transSet_.insert(conn);
trans->doBegin();
if (timeout_ > 0.0)
{
trans->setTimeout(timeout_);
}
conn->loop()->queueInLoop(
[callback = std::move(callback), trans] { callback(trans); });
}
@ -299,8 +354,8 @@ void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn)
if (!transCallbacks_.empty())
{
auto callback = std::move(transCallbacks_.front());
transCallbacks_.pop();
makeTrans(conn, std::move(callback));
transCallbacks_.pop_front();
makeTrans(conn, std::move(*callback));
return;
}
@ -427,4 +482,178 @@ DbConnectionPtr DbClientLockFree::newConnection()
bool DbClientLockFree::hasAvailableConnections() const noexcept
{
return !connections_.empty();
}
void DbClientLockFree::execSqlWithTimeout(
const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&ecb)
{
auto commandPtr = std::make_shared<std::weak_ptr<SqlCmd>>();
auto ecpPtr =
std::make_shared<std::function<void(const std::exception_ptr &)>>(
std::move(ecb));
auto timeoutFlagPtr = std::make_shared<drogon::TaskTimeoutFlag>(
loop_,
std::chrono::duration<double>(timeout_),
[commandPtr, ecpPtr, thisPtr = shared_from_this()]() {
auto cbPtr = (*commandPtr).lock();
if (cbPtr)
{
for (auto iter = thisPtr->sqlCmdBuffer_.begin();
iter != thisPtr->sqlCmdBuffer_.end();
++iter)
{
if (*iter == cbPtr)
{
thisPtr->sqlCmdBuffer_.erase(iter);
break;
}
}
}
(*ecpPtr)(
std::make_exception_ptr(TimeoutError("SQL execution timeout")));
});
auto resultCallback = [rcb = std::move(rcb),
timeoutFlagPtr](const Result &result) {
if (timeoutFlagPtr->done())
return;
rcb(result);
};
auto exceptionCallback = [ecpPtr,
timeoutFlagPtr](const std::exception_ptr &err) {
if (timeoutFlagPtr->done())
return;
(*ecpPtr)(err);
};
if (!connections_.empty() && sqlCmdBuffer_.empty() &&
transCallbacks_.empty())
{
#if (!LIBPQ_SUPPORTS_BATCH_MODE)
for (auto &conn : connections_)
{
if (!conn->isWorking() &&
(transSet_.empty() || transSet_.find(conn) == transSet_.end()))
{
conn->execSql(
string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
[resultCallback = std::move(resultCallback),
this](const Result &r) {
if (sqlCmdBuffer_.empty())
{
resultCallback(r);
}
else
{
loop_->queueInLoop(
[resultCallback = std::move(resultCallback),
r]() { resultCallback(r); });
}
},
std::move(exceptionCallback));
timeoutFlagPtr->runTimer();
return;
}
}
#else
if (type_ != ClientType::PostgreSQL)
{
for (auto &conn : connections_)
{
if (!conn->isWorking() &&
(transSet_.empty() ||
transSet_.find(conn) == transSet_.end()))
{
conn->execSql(
string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
[resultCallback = std::move(resultCallback),
this](const Result &r) {
if (sqlCmdBuffer_.empty())
{
resultCallback(r);
}
else
{
loop_->queueInLoop(
[resultCallback = std::move(resultCallback),
r]() { resultCallback(r); });
}
},
std::move(exceptionCallback));
timeoutFlagPtr->runTimer();
return;
}
}
}
else
{
/// pg batch mode
for (size_t i = 0; i < connections_.size(); ++i)
{
auto &conn = connections_[connectionPos_++];
if (connectionPos_ >= connections_.size())
connectionPos_ = 0;
if (transSet_.empty() ||
transSet_.find(conn) == transSet_.end())
{
conn->execSql(string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(resultCallback),
std::move(exceptionCallback));
timeoutFlagPtr->runTimer();
return;
}
}
}
#endif
}
if (sqlCmdBuffer_.size() > 20000)
{
// too many queries in buffer;
exceptionCallback(
std::make_exception_ptr(Failure("Too many queries in buffer")));
return;
}
// LOG_TRACE << "Push query to buffer";
auto cmdPtr = std::make_shared<SqlCmd>(
string_view{sql, sqlLength},
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
[resultCallback = std::move(resultCallback), this](const Result &r) {
if (sqlCmdBuffer_.empty())
{
resultCallback(r);
}
else
{
loop_->queueInLoop([resultCallback = std::move(resultCallback),
r]() { resultCallback(r); });
}
},
std::move(exceptionCallback));
sqlCmdBuffer_.emplace_back(cmdPtr);
*commandPtr = cmdPtr;
timeoutFlagPtr->runTimer();
}

View File

@ -1,7 +1,7 @@
/**
*
* DbClientLockFree.h
* An Tao
* @file DbClientLockFree.h
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
@ -23,6 +23,7 @@
#include <string>
#include <thread>
#include <unordered_set>
#include <list>
namespace drogon
{
@ -36,22 +37,27 @@ class DbClientLockFree : public DbClient,
trantor::EventLoop *loop,
ClientType type,
size_t connectionNumberPerLoop);
virtual ~DbClientLockFree() noexcept;
virtual void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback = nullptr) override;
virtual void newTransactionAsync(
~DbClientLockFree() noexcept override;
void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override;
std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback =
std::function<void(bool)>()) noexcept(false) override;
void newTransactionAsync(
const std::function<void(const std::shared_ptr<Transaction> &)>
&callback) override;
virtual bool hasAvailableConnections() const noexcept override;
bool hasAvailableConnections() const noexcept override;
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
private:
std::string connectionInfo_;
@ -63,13 +69,24 @@ class DbClientLockFree : public DbClient,
std::unordered_set<DbConnectionPtr> transSet_;
std::deque<std::shared_ptr<SqlCmd>> sqlCmdBuffer_;
std::queue<std::function<void(const std::shared_ptr<Transaction> &)>>
std::list<std::shared_ptr<
std::function<void(const std::shared_ptr<Transaction> &)>>>
transCallbacks_;
double timeout_{-1.0};
void makeTrans(
const DbConnectionPtr &conn,
std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
void execSqlWithTimeout(
const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&ecb);
void handleNewTask(const DbConnectionPtr &conn);
#if LIBPQ_SUPPORTS_BATCH_MODE
size_t connectionPos_{0}; // Used for pg batch mode.

View File

@ -72,6 +72,10 @@ void DbClientManager::createDbClients(
ioloops[idx],
dbInfo.dbType_,
dbInfo.connectionNumber_));
if (dbInfo.timeout_ > 0.0)
{
c->setTimeout(dbInfo.timeout_);
}
});
}
}
@ -83,6 +87,10 @@ void DbClientManager::createDbClients(
dbClientsMap_[dbInfo.name_] =
drogon::orm::DbClient::newPgClient(
dbInfo.connectionInfo_, dbInfo.connectionNumber_);
if (dbInfo.timeout_ > 0.0)
{
dbClientsMap_[dbInfo.name_]->setTimeout(dbInfo.timeout_);
}
#endif
}
else if (dbInfo.dbType_ == drogon::orm::ClientType::Mysql)
@ -91,6 +99,10 @@ void DbClientManager::createDbClients(
dbClientsMap_[dbInfo.name_] =
drogon::orm::DbClient::newMysqlClient(
dbInfo.connectionInfo_, dbInfo.connectionNumber_);
if (dbInfo.timeout_ > 0.0)
{
dbClientsMap_[dbInfo.name_]->setTimeout(dbInfo.timeout_);
}
#endif
}
else if (dbInfo.dbType_ == drogon::orm::ClientType::Sqlite3)
@ -99,6 +111,10 @@ void DbClientManager::createDbClients(
dbClientsMap_[dbInfo.name_] =
drogon::orm::DbClient::newSqlite3Client(
dbInfo.connectionInfo_, dbInfo.connectionNumber_);
if (dbInfo.timeout_ > 0.0)
{
dbClientsMap_[dbInfo.name_]->setTimeout(dbInfo.timeout_);
}
#endif
}
}
@ -119,7 +135,8 @@ void DbClientManager::createDbClient(const std::string &dbType,
#endif
const std::string &name,
const bool isFast,
const std::string &characterSet)
const std::string &characterSet,
double timeout)
{
auto connStr =
utils::formattedString("host=%s port=%u dbname=%s user=%s",
@ -144,6 +161,7 @@ void DbClientManager::createDbClient(const std::string &dbType,
info.connectionNumber_ = connectionNum;
info.isFast_ = isFast;
info.name_ = name;
info.timeout_ = timeout;
if (type == "postgresql")
{

View File

@ -10,8 +10,8 @@
/**
*
* Exception.cc
* An Tao
* @file Exception.cc
* @author An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
@ -86,7 +86,11 @@ DeadlockDetected::DeadlockDetected(const std::string &whatarg)
}
InternalError::InternalError(const std::string &whatarg)
: logic_error("libpqxx internal error: " + whatarg)
: logic_error("drogon database internal error: " + whatarg)
{
}
TimeoutError::TimeoutError(const std::string &whatarg) : logic_error(whatarg)
{
}

View File

@ -13,6 +13,7 @@
*/
#include "TransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
#include <drogon/utils/string_view.h>
#include <trantor/utils/Logger.h>
@ -95,6 +96,17 @@ void TransactionImpl::execSqlInLoop(
loop_->assertInLoopThread();
if (!isCommitedOrRolledback_)
{
if (timeout_ > 0.0)
{
execSqlInLoopWithTimeout(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
auto thisPtr = shared_from_this();
if (!isWorking_)
{
@ -116,16 +128,16 @@ void TransactionImpl::execSqlInLoop(
else
{
// push sql cmd to buffer;
SqlCmd cmd;
cmd.sql_ = std::move(sql);
cmd.parametersNumber_ = paraNum;
cmd.parameters_ = std::move(parameters);
cmd.lengths_ = std::move(length);
cmd.formats_ = std::move(format);
cmd.callback_ = std::move(rcb);
cmd.exceptionCallback_ = std::move(exceptCallback);
cmd.thisPtr_ = thisPtr;
thisPtr->sqlCmdBuffer_.push_back(std::move(cmd));
auto cmdPtr = std::make_shared<SqlCmd>();
cmdPtr->sql_ = std::move(sql);
cmdPtr->parametersNumber_ = paraNum;
cmdPtr->parameters_ = std::move(parameters);
cmdPtr->lengths_ = std::move(length);
cmdPtr->formats_ = std::move(format);
cmdPtr->callback_ = std::move(rcb);
cmdPtr->exceptionCallback_ = std::move(exceptCallback);
cmdPtr->thisPtr_ = thisPtr;
thisPtr->sqlCmdBuffer_.push_back(std::move(cmdPtr));
}
}
else
@ -152,22 +164,22 @@ void TransactionImpl::rollback()
if (thisPtr->isWorking_)
{
// push sql cmd to buffer;
SqlCmd cmd;
cmd.sql_ = "rollback";
cmd.parametersNumber_ = 0;
cmd.callback_ = [thisPtr](const Result &) {
auto cmdPtr = std::make_shared<SqlCmd>();
cmdPtr->sql_ = "rollback";
cmdPtr->parametersNumber_ = 0;
cmdPtr->callback_ = [thisPtr](const Result &) {
LOG_DEBUG << "Transaction roll back!";
thisPtr->isCommitedOrRolledback_ = true;
};
cmd.exceptionCallback_ = [thisPtr](const std::exception_ptr &) {
cmdPtr->exceptionCallback_ = [thisPtr](const std::exception_ptr &) {
// clearupCb();
thisPtr->isCommitedOrRolledback_ = true;
LOG_ERROR << "Transaction rool back error";
};
cmd.isRollbackCmd_ = true;
cmdPtr->isRollbackCmd_ = true;
// Rollback cmd should be executed firstly, so we push it in front
// of the list
thisPtr->sqlCmdBuffer_.push_front(std::move(cmd));
thisPtr->sqlCmdBuffer_.push_front(std::move(cmdPtr));
return;
}
thisPtr->isWorking_ = true;
@ -205,14 +217,14 @@ void TransactionImpl::execNewTask()
sqlCmdBuffer_.pop_front();
auto conn = connectionPtr_;
conn->execSql(
std::move(cmd.sql_),
cmd.parametersNumber_,
std::move(cmd.parameters_),
std::move(cmd.lengths_),
std::move(cmd.formats_),
[callback = std::move(cmd.callback_), cmd, thisPtr](
std::move(cmd->sql_),
cmd->parametersNumber_,
std::move(cmd->parameters_),
std::move(cmd->lengths_),
std::move(cmd->formats_),
[callback = std::move(cmd->callback_), cmd, thisPtr](
const Result &r) {
if (cmd.isRollbackCmd_)
if (cmd->isRollbackCmd_)
{
thisPtr->isCommitedOrRolledback_ = true;
}
@ -220,14 +232,14 @@ void TransactionImpl::execNewTask()
callback(r);
},
[cmd, thisPtr](const std::exception_ptr &ePtr) {
if (!cmd.isRollbackCmd_)
if (!cmd->isRollbackCmd_)
thisPtr->rollback();
else
{
thisPtr->isCommitedOrRolledback_ = true;
}
if (cmd.exceptionCallback_)
cmd.exceptionCallback_(ePtr);
if (cmd->exceptionCallback_)
cmd->exceptionCallback_(ePtr);
});
return;
}
@ -247,9 +259,9 @@ void TransactionImpl::execNewTask()
{
for (auto const &cmd : sqlCmdBuffer_)
{
if (cmd.exceptionCallback_)
if (cmd->exceptionCallback_)
{
cmd.exceptionCallback_(std::current_exception());
cmd->exceptionCallback_(std::current_exception());
}
}
}
@ -290,3 +302,100 @@ void TransactionImpl::doBegin()
});
});
}
void TransactionImpl::execSqlInLoopWithTimeout(
string_view &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&ecb)
{
auto thisPtr = shared_from_this();
std::weak_ptr<TransactionImpl> weakPtr = thisPtr;
auto commandPtr = std::make_shared<std::weak_ptr<SqlCmd>>();
auto ecpPtr =
std::make_shared<std::function<void(const std::exception_ptr &)>>(
std::move(ecb));
auto timeoutFlagPtr = std::make_shared<drogon::TaskTimeoutFlag>(
loop_,
std::chrono::duration<double>(timeout_),
[commandPtr, weakPtr, ecpPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
auto cmdPtr = (*commandPtr).lock();
if (cmdPtr)
{
for (auto iter = thisPtr->sqlCmdBuffer_.begin();
iter != thisPtr->sqlCmdBuffer_.end();
++iter)
{
if (cmdPtr == *iter)
{
thisPtr->sqlCmdBuffer_.erase(iter);
break;
}
}
}
thisPtr->rollback();
if (*ecpPtr)
{
(*ecpPtr)(std::make_exception_ptr(
TimeoutError("SQL execution timeout")));
}
});
auto resultCallback = [rcb = std::move(rcb),
timeoutFlagPtr](const drogon::orm::Result &result) {
if (timeoutFlagPtr->done())
return;
rcb(result);
};
if (!isWorking_)
{
isWorking_ = true;
thisPtr_ = thisPtr;
connectionPtr_->execSql(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(resultCallback),
[ecpPtr, timeoutFlagPtr, thisPtr](
const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (timeoutFlagPtr->done())
return;
if (*ecpPtr)
{
(*ecpPtr)(ePtr);
}
});
}
else
{
// push sql cmd to buffer;
auto cmdPtr = std::make_shared<SqlCmd>();
cmdPtr->sql_ = std::move(sql);
cmdPtr->parametersNumber_ = paraNum;
cmdPtr->parameters_ = std::move(parameters);
cmdPtr->lengths_ = std::move(length);
cmdPtr->formats_ = std::move(format);
cmdPtr->callback_ = std::move(resultCallback);
cmdPtr->exceptionCallback_ =
[ecpPtr, timeoutFlagPtr](const std::exception_ptr &ePtr) {
if (timeoutFlagPtr->done())
return;
if (*ecpPtr)
{
(*ecpPtr)(ePtr);
}
};
cmdPtr->thisPtr_ = thisPtr;
thisPtr->sqlCmdBuffer_.push_back(cmdPtr);
*commandPtr = cmdPtr;
}
timeoutFlagPtr->runTimer();
}

View File

@ -33,27 +33,31 @@ class TransactionImpl : public Transaction,
const std::function<void()> &usedUpCallback);
~TransactionImpl();
void rollback() override;
virtual void setCommitCallback(
void setCommitCallback(
const std::function<void(bool)> &commitCallback) override
{
commitCallback_ = commitCallback;
}
virtual bool hasAvailableConnections() const noexcept override
bool hasAvailableConnections() const noexcept override
{
return connectionPtr_->status() == ConnectStatus::Ok;
}
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
private:
DbConnectionPtr connectionPtr_;
virtual void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override
void execSql(const char *sql,
size_t sqlLength,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)>
&&exceptCallback) override
{
if (loop_->isInLoopThread())
{
@ -95,13 +99,21 @@ class TransactionImpl : public Transaction,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
virtual std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &) override
void execSqlInLoopWithTimeout(
string_view &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &) noexcept(false) override
{
return shared_from_this();
}
virtual void newTransactionAsync(
void newTransactionAsync(
const std::function<void(const std::shared_ptr<Transaction> &)>
&callback) override
{
@ -123,7 +135,8 @@ class TransactionImpl : public Transaction,
bool isRollbackCmd_{false};
std::shared_ptr<TransactionImpl> thisPtr_;
};
std::list<SqlCmd> sqlCmdBuffer_;
using SqlCmdPtr = std::shared_ptr<SqlCmd>;
std::list<SqlCmdPtr> sqlCmdBuffer_;
// std::mutex _bufferMutex;
friend class DbClientImpl;
friend class DbClientLockFree;
@ -131,6 +144,7 @@ class TransactionImpl : public Transaction,
trantor::EventLoop *loop_;
std::function<void(bool)> commitCallback_;
std::shared_ptr<TransactionImpl> thisPtr_;
double timeout_{-1.0};
};
} // namespace orm
} // namespace drogon

@ -1 +1 @@
Subproject commit 5103ec795ed352713679922882ee7f5a4f93a938
Subproject commit 24310aac97e97400ffc25185a5a2425491eaab72