Optimize PostgreSQL client

This commit is contained in:
antao 2019-01-08 19:37:19 +08:00
parent fe079a4f7e
commit 937a2fd136
19 changed files with 175 additions and 160 deletions

View File

@ -152,13 +152,13 @@ class DbClient : public trantor::NonCopyable
private:
friend internal::SqlBinder;
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exptCallback) = 0;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) = 0;
protected:
ClientType _type;

View File

@ -84,9 +84,11 @@ class Result
/// For Mysql, Sqlite3 database
unsigned long long insertId() const noexcept;
const std::string &sql() const noexcept;
private:
ResultImplPtr _resultPtr;
std::string _query;
std::string _errString;
friend class Field;
friend class Row;

View File

@ -73,13 +73,13 @@ DbClientImpl::~DbClientImpl() noexcept
}
void DbClientImpl::execSql(const DbConnectionPtr &conn,
const std::string &sql,
std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &cb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
if (!conn)
{
@ -94,8 +94,8 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
return;
}
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(sql, paraNum, parameters, length, format,
cb, exceptCallback,
conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
std::move(rcb), std::move(exceptCallback),
[=]() -> void {
{
auto connPtr = weakConn.lock();
@ -105,18 +105,18 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
}
});
}
void DbClientImpl::execSql(const std::string &sql,
void DbClientImpl::execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const QueryCallback &cb,
const ExceptPtrCallback &exceptCb)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(cb);
assert(rcb);
DbConnectionPtr conn;
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
@ -131,7 +131,7 @@ void DbClientImpl::execSql(const std::string &sql,
}
catch (...)
{
exceptCb(std::current_exception());
exceptCallback(std::current_exception());
}
return;
}
@ -146,7 +146,7 @@ void DbClientImpl::execSql(const std::string &sql,
}
if (conn)
{
execSql(conn, sql, paraNum, parameters, length, format, cb, exceptCb);
execSql(conn, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback));
return;
}
bool busy = false;
@ -166,19 +166,19 @@ void DbClientImpl::execSql(const std::string &sql,
}
catch (...)
{
exceptCb(std::current_exception());
exceptCallback(std::current_exception());
}
return;
}
//LOG_TRACE << "Push query to buffer";
SqlCmd cmd;
cmd._sql = sql;
cmd._paraNum = paraNum;
cmd._parameters = parameters;
cmd._length = length;
cmd._format = format;
cmd._cb = cb;
cmd._exceptCb = exceptCb;
std::shared_ptr<SqlCmd> cmd = std::make_shared<SqlCmd>();
cmd->_sql = std::move(sql);
cmd->_paraNum = paraNum;
cmd->_parameters = std::move(parameters);
cmd->_length = std::move(length);
cmd->_format = std::move(format);
cmd->_cb = std::move(rcb);
cmd->_exceptCb = std::move(exceptCallback);
{
std::lock_guard<std::mutex> guard(_bufferMutex);
_sqlCmdBuffer.push_back(std::move(cmd));
@ -239,18 +239,20 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
if (!_sqlCmdBuffer.empty())
{
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() {
execSql(connPtr, cmd._sql, cmd._paraNum, cmd._parameters, cmd._length, cmd._format, cmd._cb, cmd._exceptCb);
auto &cmd = _sqlCmdBuffer.front();
_loopPtr->queueInLoop([connPtr, cmd, this]() {
execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
});
_sqlCmdBuffer.pop_front();
return;
}
}
//Idle connection
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
_loopPtr->queueInLoop([connPtr, this]() {
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
});
}
}

View File

@ -34,25 +34,26 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
public:
DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type);
virtual ~DbClientImpl() noexcept;
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
void ioLoop();
std::shared_ptr<trantor::EventLoop> _loopPtr;
void execSql(const DbConnectionPtr &conn, const std::string &sql,
void execSql(const DbConnectionPtr &conn,
std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
DbConnectionPtr newConnection();
@ -78,7 +79,7 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::deque<SqlCmd> _sqlCmdBuffer;
std::deque<std::shared_ptr<SqlCmd>> _sqlCmdBuffer;
std::mutex _bufferMutex;
void handleNewTask(const DbConnectionPtr &conn);

View File

@ -51,14 +51,14 @@ class DbConnection : public trantor::NonCopyable
{
_closeCb = cb;
}
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) = 0;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) = 0;
virtual ~DbConnection()
{
LOG_TRACE << "Destruct DbConn" << this;

View File

@ -114,7 +114,6 @@ Result::size_type Result::size() const noexcept
void Result::swap(Result &other) noexcept
{
_resultPtr.swap(other._resultPtr);
_query.swap(other._query);
_errString.swap(other._errString);
}
Result::row_size_type Result::columns() const noexcept
@ -148,4 +147,8 @@ Result::field_size_type Result::getLength(Result::size_type row, Result::row_siz
unsigned long long Result::insertId() const noexcept
{
return _resultPtr->insertId();
}
const std::string &Result::sql() const noexcept
{
return _resultPtr->sql();
}

View File

@ -24,6 +24,7 @@ namespace orm
class ResultImpl : public trantor::NonCopyable, public Result
{
public:
ResultImpl(const std::string &query) : _query(query) {}
virtual size_type size() const noexcept = 0;
virtual row_size_type columns() const noexcept = 0;
virtual const char *columnName(row_size_type Number) const = 0;
@ -32,8 +33,12 @@ class ResultImpl : public trantor::NonCopyable, public Result
virtual const char *getValue(size_type row, row_size_type column) const = 0;
virtual bool isNull(size_type row, row_size_type column) const = 0;
virtual field_size_type getLength(size_type row, row_size_type column) const = 0;
virtual const std::string &sql() const { return _query; }
virtual unsigned long long insertId() const noexcept { return 0; }
virtual ~ResultImpl() {}
private:
std::string _query;
};
} // namespace orm

View File

@ -28,7 +28,7 @@ void SqlBinder::exec()
//nonblocking mode,default mode
//Retain shared_ptrs of parameters until we get the result;
std::shared_ptr<decltype(_objs)> objs = std::make_shared<decltype(_objs)>(std::move(_objs));
_client.execSql(_sql, _paraNum, _parameters, _length, _format,
_client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format),
[holder = std::move(_callbackHolder), objs](const Result &r) {
objs->clear();
if (holder)
@ -67,7 +67,7 @@ void SqlBinder::exec()
std::shared_ptr<std::promise<Result>> pro(new std::promise<Result>);
auto f = pro->get_future();
_client.execSql(_sql, _paraNum, _parameters, _length, _format,
_client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format),
[pro](const Result &r) {
pro->set_value(r);
},

View File

@ -70,27 +70,27 @@ TransactionImpl::~TransactionImpl()
});
}
}
void TransactionImpl::execSql(const std::string &sql,
void TransactionImpl::execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql, paraNum, parameters, length, format, rcb, exceptCallback]() {
_loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable {
if (!thisPtr->_isCommitedOrRolledback)
{
if (!thisPtr->_isWorking)
{
thisPtr->_isWorking = true;
thisPtr->_connectionPtr->execSql(sql,
thisPtr->_connectionPtr->execSql(std::move(sql),
paraNum,
parameters,
length,
format,
rcb,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
[exceptCallback, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (exceptCallback)
@ -104,13 +104,13 @@ void TransactionImpl::execSql(const std::string &sql,
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = sql;
cmd._sql = std::move(sql);
cmd._paraNum = paraNum;
cmd._parameters = parameters;
cmd._length = length;
cmd._format = format;
cmd._cb = rcb;
cmd._exceptCb = exceptCallback;
cmd._parameters = std::move(parameters);
cmd._length = std::move(length);
cmd._format = std::move(format);
cmd._cb = std::move(rcb);
cmd._exceptCb = std::move(exceptCallback);
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
}
}
@ -196,13 +196,13 @@ void TransactionImpl::execNewTask()
auto conn = _connectionPtr;
_loop->queueInLoop([=]() {
conn->execSql(cmd._sql,
_loop->queueInLoop([=]() mutable {
conn->execSql(std::move(cmd._sql),
cmd._paraNum,
cmd._parameters,
cmd._length,
cmd._format,
cmd._cb,
std::move(cmd._parameters),
std::move(cmd._length),
std::move(cmd._format),
std::move(cmd._cb),
[cmd, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (cmd._exceptCb)

View File

@ -32,13 +32,13 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
private:
DbConnectionPtr _connectionPtr;
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override
{
return shared_from_this();

View File

@ -268,14 +268,14 @@ void MysqlConnection::handleEvent()
}
}
void MysqlConnection::execSql(const std::string &sql,
void MysqlConnection::execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
{
LOG_TRACE << sql;
assert(paraNum == parameters.size());
@ -286,10 +286,10 @@ void MysqlConnection::execSql(const std::string &sql,
assert(!_isWorking);
assert(!sql.empty());
_cb = rcb;
_idleCb = idleCb;
_cb = std::move(rcb);
_idleCb = std::move(idleCb);
_isWorking = true;
_exceptCb = exceptCallback;
_exceptCb = std::move(exceptCallback);
_sql.clear();
if (paraNum > 0)
{

View File

@ -37,14 +37,14 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
public:
MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo);
~MysqlConnection() {}
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) override;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
private:
std::unique_ptr<trantor::Channel> _channelPtr;

View File

@ -34,8 +34,8 @@ class MysqlResultImpl : public ResultImpl
const std::string &query,
size_type affectedRows,
unsigned long long insertId) noexcept
: _result(r),
_query(query),
: ResultImpl(query),
_result(r),
_rowsNum(_result ? mysql_num_rows(_result.get()) : 0),
_fieldArray(r ? mysql_fetch_fields(r.get()) : nullptr),
_fieldNum(r ? mysql_num_fields(r.get()) : 0),
@ -78,7 +78,6 @@ class MysqlResultImpl : public ResultImpl
private:
const std::shared_ptr<MYSQL_RES> _result;
const std::string _query;
const Result::size_type _rowsNum;
const MYSQL_FIELD *_fieldArray;
const Result::row_size_type _fieldNum;

View File

@ -97,12 +97,14 @@ void PgConnection::pgPoll()
LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get());
break;
case PGRES_POLLING_WRITING:
_channel.enableWriting();
_channel.disableReading();
if (!_channel.isWriting())
_channel.enableWriting();
break;
case PGRES_POLLING_READING:
_channel.enableReading();
_channel.disableWriting();
if (!_channel.isReading())
_channel.enableReading();
if (_channel.isWriting())
_channel.disableWriting();
break;
case PGRES_POLLING_OK:
@ -112,8 +114,10 @@ void PgConnection::pgPoll()
assert(_okCb);
_okCb(shared_from_this());
}
_channel.enableReading();
_channel.disableWriting();
if (!_channel.isReading())
_channel.enableReading();
if (_channel.isWriting())
_channel.disableWriting();
break;
case PGRES_POLLING_ACTIVE:
//unused!
@ -122,14 +126,14 @@ void PgConnection::pgPoll()
break;
}
}
void PgConnection::execSql(const std::string &sql,
void PgConnection::execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
{
LOG_TRACE << sql;
assert(paraNum == parameters.size());
@ -139,16 +143,16 @@ void PgConnection::execSql(const std::string &sql,
assert(idleCb);
assert(!_isWorking);
assert(!sql.empty());
_sql = sql;
_cb = rcb;
_idleCb = idleCb;
_sql = std::move(sql);
_cb = std::move(rcb);
_idleCb = std::move(idleCb);
_isWorking = true;
_exceptCb = exceptCallback;
_exceptCb = std::move(exceptCallback);
auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr, sql, paraNum, parameters, length, format]() {
_loop->runInLoop([thisPtr, paraNum=std::move(paraNum), parameters=std::move(parameters), length=std::move(length), format=std::move(format)]() {
if (PQsendQueryParams(
thisPtr->_connPtr.get(),
sql.c_str(),
thisPtr->_sql.c_str(),
paraNum,
NULL,
parameters.data(),
@ -198,8 +202,8 @@ void PgConnection::handleRead()
//need read more data from socket;
return;
}
_channel.disableWriting();
if (_channel.isWriting())
_channel.disableWriting();
// got query results?
while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) {
PQclear(p);

View File

@ -37,14 +37,14 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
public:
PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) override;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
private:
std::shared_ptr<PGconn> _connPtr;

View File

@ -29,8 +29,8 @@ class PostgreSQLResultImpl : public ResultImpl
{
public:
PostgreSQLResultImpl(const std::shared_ptr<PGresult> &r, const std::string &query) noexcept
: _result(r),
_query(query)
:ResultImpl(query),
_result(r)
{
}
virtual size_type size() const noexcept override;
@ -41,10 +41,8 @@ class PostgreSQLResultImpl : public ResultImpl
virtual const char *getValue(size_type row, row_size_type column) const override;
virtual bool isNull(size_type row, row_size_type column) const override;
virtual field_size_type getLength(size_type row, row_size_type column) const override;
private:
std::shared_ptr<PGresult> _result;
std::string _query;
};
} // namespace orm

View File

@ -86,17 +86,17 @@ Sqlite3Connection::Sqlite3Connection(trantor::EventLoop *loop, const std::string
});
}
void Sqlite3Connection::execSql(const std::string &sql,
void Sqlite3Connection::execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
{
auto thisPtr = shared_from_this();
_loopThread.getLoop()->runInLoop([=]() {
_loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb);
});
}

View File

@ -38,14 +38,14 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
public:
Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo);
virtual void execSql(const std::string &sql,
virtual void execSql(std::string &&sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) override;
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
private:
static std::once_flag _once;

View File

@ -30,6 +30,7 @@ class Sqlite3ResultImpl : public ResultImpl
{
public:
Sqlite3ResultImpl(const std::string &query) noexcept
: ResultImpl(query)
{
}
virtual size_type size() const noexcept override;