Add an intermediate abstraction layer for the database client class

This commit is contained in:
antao 2018-11-21 18:18:10 +08:00
parent 532d69b27f
commit b4621bebb1
10 changed files with 460 additions and 366 deletions

View File

@ -0,0 +1,253 @@
//
// Created by antao on 2018/6/22.
//
#include "DbClientGeneralImpl.h"
#include "DbConnection.h"
#include "TransactionImpl.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/orm/Exception.h>
#include <drogon/orm/DbClient.h>
#include <sys/select.h>
#include <iostream>
#include <thread>
#include <vector>
#include <unordered_set>
#include <memory>
#include <stdio.h>
#include <unistd.h>
#include <sstream>
using namespace drogon::orm;
DbClientGeneralImpl::DbClientGeneralImpl(const std::string &connInfo, const size_t connNum)
: _connInfo(connInfo),
_connectNum(connNum)
{
assert(connNum > 0);
_loopThread = std::thread([=]() {
_loopPtr = std::shared_ptr<trantor::EventLoop>(new trantor::EventLoop);
ioLoop();
});
}
void DbClientGeneralImpl::ioLoop()
{
for (size_t i = 0; i < _connectNum; i++)
{
_connections.insert(newConnection());
}
_loopPtr->loop();
}
DbClientGeneralImpl::~DbClientGeneralImpl()
{
_stop = true;
_loopPtr->quit();
if (_loopThread.joinable())
_loopThread.join();
}
void DbClientGeneralImpl::execSql(const DbConnectionPtr &conn,
const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &cb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
if (!conn)
{
try
{
throw BrokenConnection("There is no connection to PG server!");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(sql, paraNum, parameters, length, format,
cb, exceptCallback,
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask(connPtr);
}
});
}
void DbClientGeneralImpl::execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const QueryCallback &cb,
const ExceptPtrCallback &exceptCb)
{
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(cb);
DbConnectionPtr conn;
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_readyConnections.size() == 0)
{
if (_busyConnections.size() == 0)
{
try
{
throw BrokenConnection("No connection to postgreSQL server");
}
catch (...)
{
exceptCb(std::current_exception());
}
return;
}
}
else
{
auto iter = _readyConnections.begin();
_busyConnections.insert(*iter);
conn = *iter;
_readyConnections.erase(iter);
}
}
if (conn)
{
execSql(conn, sql, paraNum, parameters, length, format, cb, exceptCb);
return;
}
bool busy = false;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 10000)
{
//too many queries in buffer;
busy = true;
}
}
if (busy)
{
try
{
throw Failure("Too many queries in buffer");
}
catch (...)
{
exceptCb(std::current_exception());
}
return;
}
//LOG_TRACE << "Push query to buffer";
SqlCmd cmd;
cmd._sql = sql;
cmd._paraNum = paraNum;
cmd._parameters = parameters;
cmd._length = length;
cmd._format = format;
cmd._cb = cb;
cmd._exceptCb = exceptCb;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
_sqlCmdBuffer.push_back(std::move(cmd));
}
}
std::string DbClientGeneralImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const
{
std::string::size_type startPos = 0;
std::string::size_type pos;
std::stringstream ret;
size_t phCount = 1;
do
{
pos = sqlStr.find(holderStr, startPos);
if (pos == std::string::npos)
{
ret << sqlStr.substr(startPos);
return ret.str();
}
ret << sqlStr.substr(startPos, pos - startPos);
ret << "$";
ret << phCount++;
startPos = pos + holderStr.length();
} while (1);
}
std::shared_ptr<Transaction> DbClientGeneralImpl::newTransaction()
{
DbConnectionPtr conn;
{
std::unique_lock<std::mutex> lock(_connectionsMutex);
_transWaitNum++;
_condConnectionReady.wait(lock, [this]() {
return _readyConnections.size() > 0;
});
_transWaitNum--;
auto iter = _readyConnections.begin();
_busyConnections.insert(*iter);
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(conn, [=]() {
if (conn->status() == ConnectStatus_Bad)
{
return;
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_connections.find(conn) == _connections.end() &&
_busyConnections.find(conn) == _busyConnections.find(conn))
{
//connection is broken and removed
return;
}
}
handleNewTask(conn);
}));
trans->doBegin();
return trans;
}
void DbClientGeneralImpl::handleNewTask(const DbConnectionPtr &connPtr)
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_transWaitNum > 0)
{
//Prioritize the needs of the transaction
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
_condConnectionReady.notify_one();
}
else
{
//Then check if there are some sql queries in the buffer
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 0)
{
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() {
execSql(connPtr, cmd._sql, cmd._paraNum, cmd._parameters, cmd._length, cmd._format, cmd._cb, cmd._exceptCb);
});
return;
}
}
//Idle connection
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
}
}

View File

@ -0,0 +1,76 @@
#pragma once
#include "DbConnection.h"
#include <drogon/orm/DbClient.h>
#include <trantor/net/EventLoop.h>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <unordered_set>
#include <list>
namespace drogon
{
namespace orm
{
// extern Result makeResult(SqlStatus status, const std::shared_ptr<PGresult> &r = std::shared_ptr<PGresult>(nullptr),
// const std::string &query = "");
class DbClientGeneralImpl : public DbClient
{
public:
DbClientGeneralImpl(const std::string &connInfo, const size_t connNum);
virtual ~DbClientGeneralImpl();
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const override;
virtual std::shared_ptr<Transaction> newTransaction() override;
protected:
void ioLoop();
std::shared_ptr<trantor::EventLoop> _loopPtr;
void execSql(const DbConnectionPtr &conn, const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
virtual DbConnectionPtr newConnection() = 0;
std::unordered_set<DbConnectionPtr> _connections;
std::unordered_set<DbConnectionPtr> _readyConnections;
std::unordered_set<DbConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0;
size_t _connectNum;
bool _stop = false;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::list<SqlCmd> _sqlCmdBuffer;
std::mutex _bufferMutex;
void handleNewTask(const DbConnectionPtr &conn);
};
} // namespace orm
} // namespace drogon

View File

@ -0,0 +1,68 @@
#pragma once
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/orm/DbClient.h>
#include <trantor/utils/NonCopyable.h>
#include <memory>
#include <string>
#include <functional>
#include <iostream>
namespace drogon
{
namespace orm
{
enum ConnectStatus
{
ConnectStatus_None = 0,
ConnectStatus_Connecting,
ConnectStatus_Ok,
ConnectStatus_Bad
};
class DbConnection;
typedef std::shared_ptr<DbConnection> DbConnectionPtr;
class DbConnection : public trantor::NonCopyable
{
public:
typedef std::function<void(const DbConnectionPtr &)> DbConnectionCallback;
DbConnection(trantor::EventLoop *loop) : _loop(loop) {}
void setOkCallback(const DbConnectionCallback &cb)
{
_okCb = cb;
}
void setCloseCallback(const DbConnectionCallback &cb)
{
_closeCb = cb;
}
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) = 0;
virtual ~DbConnection()
{
LOG_TRACE << "Destruct DbConn" << this;
}
ConnectStatus status() const { return _status; }
trantor::EventLoop *loop() { return _loop; }
protected:
QueryCallback _cb;
trantor::EventLoop *_loop;
std::function<void()> _idleCb;
ConnectStatus _status = ConnectStatus_None;
DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {};
DbConnectionCallback _okCb = [](const DbConnectionPtr &) {};
std::function<void(const std::exception_ptr &)> _exceptCb;
bool _isWorking = false;
std::string _sql = "";
};
} // namespace orm
} // namespace drogon

View File

@ -1,6 +1,6 @@
/**
*
* PgTransactionImpl.cc
* TransactionImpl.cc
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
@ -10,19 +10,19 @@
*
*/
#include "PgTransactionImpl.h"
#include "TransactionImpl.h"
#include <trantor/utils/Logger.h>
using namespace drogon::orm;
PgTransactionImpl::PgTransactionImpl(const PgConnectionPtr &connPtr,
TransactionImpl::TransactionImpl(const DbConnectionPtr &connPtr,
const std::function<void()> &usedUpCallback)
: _connectionPtr(connPtr),
_usedUpCallback(usedUpCallback),
_loop(connPtr->loop())
{
}
PgTransactionImpl::~PgTransactionImpl()
TransactionImpl::~TransactionImpl()
{
LOG_TRACE << "Destruct";
assert(!_isWorking);
@ -52,7 +52,7 @@ PgTransactionImpl::~PgTransactionImpl()
});
}
}
void PgTransactionImpl::execSql(const std::string &sql,
void TransactionImpl::execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
@ -111,7 +111,7 @@ void PgTransactionImpl::execSql(const std::string &sql,
});
}
void PgTransactionImpl::rollback()
void TransactionImpl::rollback()
{
auto thisPtr = shared_from_this();
@ -164,7 +164,7 @@ void PgTransactionImpl::rollback()
});
}
void PgTransactionImpl::execNewTask()
void TransactionImpl::execNewTask()
{
_loop->assertInLoopThread();
assert(_isWorking);
@ -223,7 +223,7 @@ void PgTransactionImpl::execNewTask()
}
}
void PgTransactionImpl::doBegin()
void TransactionImpl::doBegin()
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr]() {
@ -252,7 +252,7 @@ void PgTransactionImpl::doBegin()
});
}
std::string PgTransactionImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const
std::string TransactionImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const
{
std::string::size_type startPos = 0;
std::string::size_type pos;

View File

@ -1,6 +1,6 @@
/**
*
* PgTransactionImpl.h
* TransactionImpl.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
@ -12,7 +12,7 @@
#pragma once
#include "PgConnection.h"
#include "DbConnection.h"
#include <drogon/orm/DbClient.h>
#include <functional>
#include <list>
@ -21,16 +21,16 @@ namespace drogon
{
namespace orm
{
class PgTransactionImpl : public Transaction, public std::enable_shared_from_this<PgTransactionImpl>
class TransactionImpl : public Transaction, public std::enable_shared_from_this<TransactionImpl>
{
public:
PgTransactionImpl(const PgConnectionPtr &connPtr, const std::function<void()> &usedUpCallback);
~PgTransactionImpl();
TransactionImpl(const DbConnectionPtr &connPtr, const std::function<void()> &usedUpCallback);
~TransactionImpl();
void rollback() override;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const override;
private:
PgConnectionPtr _connectionPtr;
DbConnectionPtr _connectionPtr;
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
@ -59,7 +59,7 @@ class PgTransactionImpl : public Transaction, public std::enable_shared_from_thi
};
std::list<SqlCmd> _sqlCmdBuffer;
// std::mutex _bufferMutex;
friend class PgClientImpl;
friend class DbClientGeneralImpl;
void doBegin();
trantor::EventLoop *_loop;
};

View File

@ -2,7 +2,6 @@
// Created by antao on 2018/6/22.
//
#include "PgClientImpl.h"
#include "PgTransactionImpl.h"
#include "PgConnection.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
@ -21,256 +20,33 @@
using namespace drogon::orm;
PgConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop)
DbConnectionPtr PgClientImpl::newConnection()
{
//std::cout<<"newConn"<<std::endl;
auto connPtr = std::make_shared<PgConnection>(loop, _connInfo);
connPtr->setCloseCallback([=](const PgConnectionPtr &closeConnPtr) {
//std::cout<<"Conn closed!"<<closeConnPtr<<std::endl;
sleep(1);
std::lock_guard<std::mutex> guard(_connectionsMutex);
_readyConnections.erase(closeConnPtr);
_busyConnections.erase(closeConnPtr);
assert(_connections.find(closeConnPtr) != _connections.end());
_connections.erase(closeConnPtr);
_connections.insert(newConnection(loop));
//std::cout<<"Conn closed!end"<<std::endl;
auto connPtr = std::make_shared<PgConnection>(_loopPtr.get(), _connInfo);
auto loopPtr = _loopPtr;
std::weak_ptr<PgClientImpl> weakPtr = shared_from_this();
connPtr->setCloseCallback([weakPtr, loopPtr](const DbConnectionPtr &closeConnPtr) {
//Reconnect after 1 second
loopPtr->runAfter(1, [weakPtr, closeConnPtr] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
thisPtr->_readyConnections.erase(closeConnPtr);
thisPtr->_busyConnections.erase(closeConnPtr);
assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end());
thisPtr->_connections.erase(closeConnPtr);
thisPtr->_connections.insert(thisPtr->newConnection());
});
});
connPtr->setOkCallback([=](const PgConnectionPtr &okConnPtr) {
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
LOG_TRACE << "postgreSQL connected!";
handleNewTask(okConnPtr);
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->handleNewTask(okConnPtr);
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}
PgClientImpl::PgClientImpl(const std::string &connInfo, const size_t connNum)
: _connInfo(connInfo),
_connectNum(connNum)
{
assert(connNum > 0);
_loopThread = std::thread([=]() {
_loopPtr = std::unique_ptr<trantor::EventLoop>(new trantor::EventLoop);
ioLoop();
});
}
void PgClientImpl::ioLoop()
{
for (size_t i = 0; i < _connectNum; i++)
{
_connections.insert(newConnection(_loopPtr.get()));
}
_loopPtr->loop();
}
PgClientImpl::~PgClientImpl()
{
_stop = true;
_loopPtr->quit();
if (_loopThread.joinable())
_loopThread.join();
}
void PgClientImpl::execSql(const PgConnectionPtr &conn,
const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &cb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
if (!conn)
{
try
{
throw BrokenConnection("There is no connection to PG server!");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
std::weak_ptr<PgConnection> weakConn = conn;
conn->execSql(sql, paraNum, parameters, length, format,
cb, exceptCallback,
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask(connPtr);
}
});
}
void PgClientImpl::execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const QueryCallback &cb,
const ExceptPtrCallback &exceptCb)
{
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(cb);
PgConnectionPtr conn;
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_readyConnections.size() == 0)
{
if (_busyConnections.size() == 0)
{
try
{
throw BrokenConnection("No connection to postgreSQL server");
}
catch (...)
{
exceptCb(std::current_exception());
}
return;
}
}
else
{
auto iter = _readyConnections.begin();
_busyConnections.insert(*iter);
conn = *iter;
_readyConnections.erase(iter);
}
}
if (conn)
{
execSql(conn, sql, paraNum, parameters, length, format, cb, exceptCb);
return;
}
bool busy = false;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 10000)
{
//too many queries in buffer;
busy = true;
}
}
if (busy)
{
try
{
throw Failure("Too many queries in buffer");
}
catch (...)
{
exceptCb(std::current_exception());
}
return;
}
//LOG_TRACE << "Push query to buffer";
SqlCmd cmd;
cmd._sql = sql;
cmd._paraNum = paraNum;
cmd._parameters = parameters;
cmd._length = length;
cmd._format = format;
cmd._cb = cb;
cmd._exceptCb = exceptCb;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
_sqlCmdBuffer.push_back(std::move(cmd));
}
}
std::string PgClientImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const
{
std::string::size_type startPos = 0;
std::string::size_type pos;
std::stringstream ret;
size_t phCount = 1;
do
{
pos = sqlStr.find(holderStr, startPos);
if (pos == std::string::npos)
{
ret << sqlStr.substr(startPos);
return ret.str();
}
ret << sqlStr.substr(startPos, pos - startPos);
ret << "$";
ret << phCount++;
startPos = pos + holderStr.length();
} while (1);
}
std::shared_ptr<Transaction> PgClientImpl::newTransaction()
{
PgConnectionPtr conn;
{
std::unique_lock<std::mutex> lock(_connectionsMutex);
_transWaitNum++;
_condConnectionReady.wait(lock, [this]() {
return _readyConnections.size() > 0;
});
_transWaitNum--;
auto iter = _readyConnections.begin();
_busyConnections.insert(*iter);
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<PgTransactionImpl>(new PgTransactionImpl(conn, [=]() {
if (conn->status() == ConnectStatus_Bad)
{
return;
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_connections.find(conn) == _connections.end() &&
_busyConnections.find(conn) == _busyConnections.find(conn))
{
//connection is broken and removed
return;
}
}
handleNewTask(conn);
}));
trans->doBegin();
return trans;
}
void PgClientImpl::handleNewTask(const PgConnectionPtr &connPtr)
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_transWaitNum > 0)
{
//Prioritize the needs of the transaction
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
_condConnectionReady.notify_one();
}
else
{
//Then check if there are some sql queries in the buffer
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 0)
{
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() {
execSql(connPtr, cmd._sql, cmd._paraNum, cmd._parameters, cmd._length, cmd._format, cmd._cb, cmd._exceptCb);
});
return;
}
}
//Idle connection
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include "PgConnection.h"
#include <drogon/orm/DbClient.h>
#include "../DbClientGeneralImpl.h"
#include <trantor/net/EventLoop.h>
#include <memory>
#include <thread>
@ -14,63 +14,16 @@ namespace drogon
{
namespace orm
{
// extern Result makeResult(SqlStatus status, const std::shared_ptr<PGresult> &r = std::shared_ptr<PGresult>(nullptr),
// const std::string &query = "");
class PgClientImpl : public DbClient
class PgClientImpl : public DbClientGeneralImpl, public std::enable_shared_from_this<PgClientImpl>
{
public:
PgClientImpl(const std::string &connInfo, const size_t connNum);
~PgClientImpl();
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const override;
virtual std::shared_ptr<Transaction> newTransaction() override;
public:
PgClientImpl(const std::string &connInfo, const size_t connNum) : DbClientGeneralImpl(connInfo, connNum) {}
private:
void ioLoop();
std::unique_ptr<trantor::EventLoop> _loopPtr;
void execSql(const PgConnectionPtr &conn, const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
PgConnectionPtr newConnection(trantor::EventLoop *loop);
std::unordered_set<PgConnectionPtr> _connections;
std::unordered_set<PgConnectionPtr> _readyConnections;
std::unordered_set<PgConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0;
size_t _connectNum;
bool _stop = false;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::list<SqlCmd> _sqlCmdBuffer;
std::mutex _bufferMutex;
void handleNewTask(const PgConnectionPtr &conn);
private:
virtual DbConnectionPtr newConnection() override;
};
} // namespace orm
} // namespace drogon

View File

@ -19,10 +19,11 @@ Result makeResult(const std::shared_ptr<PGresult> &r = std::shared_ptr<PGresult>
} // namespace drogon
PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo)
: _connPtr(std::shared_ptr<PGconn>(PQconnectStart(connInfo.c_str()), [](PGconn *conn) {
: DbConnection(loop),
_connPtr(std::shared_ptr<PGconn>(PQconnectStart(connInfo.c_str()), [](PGconn *conn) {
PQfinish(conn);
})),
_loop(loop), _channel(_loop, PQsocket(_connPtr.get()))
_channel(loop, PQsocket(_connPtr.get()))
{
PQsetnonblocking(_connPtr.get(), 1);
//assert(PQisnonblocking(_connPtr.get()));
@ -57,10 +58,10 @@ PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo
_channel.enableReading();
_channel.enableWriting();
}
int PgConnection::sock()
{
return PQsocket(_connPtr.get());
}
// int PgConnection::sock()
// {
// return PQsocket(_connPtr.get());
// }
void PgConnection::handleClosed()
{
_status = ConnectStatus_Bad;

View File

@ -1,5 +1,6 @@
#pragma once
#include "../DbConnection.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/orm/DbClient.h>
@ -15,30 +16,13 @@ namespace drogon
namespace orm
{
enum ConnectStatus
{
ConnectStatus_None = 0,
ConnectStatus_Connecting,
ConnectStatus_Ok,
ConnectStatus_Bad
};
class PgConnection;
typedef std::shared_ptr<PgConnection> PgConnectionPtr;
class PgConnection : public trantor::NonCopyable, public std::enable_shared_from_this<PgConnection>
class PgConnection : public DbConnection, public std::enable_shared_from_this<PgConnection>
{
public:
typedef std::function<void(const PgConnectionPtr &)> PgConnectionCallback;
PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
void setOkCallback(const PgConnectionCallback &cb)
{
_okCb = cb;
}
void setCloseCallback(const PgConnectionCallback &cb)
{
_closeCb = cb;
}
void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
@ -47,26 +31,9 @@ class PgConnection : public trantor::NonCopyable, public std::enable_shared_from
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
~PgConnection()
{
LOG_TRACE<<"Destruct DbConn"<<this;
}
int sock();
trantor::EventLoop *loop() { return _loop; }
ConnectStatus status() const { return _status; }
private:
std::shared_ptr<PGconn> _connPtr;
trantor::EventLoop *_loop;
trantor::Channel _channel;
QueryCallback _cb;
std::function<void()> _idleCb;
ConnectStatus _status = ConnectStatus_None;
PgConnectionCallback _closeCb = [](const PgConnectionPtr &) {};
PgConnectionCallback _okCb = [](const PgConnectionPtr &) {};
std::function<void(const std::exception_ptr &)> _exceptCb;
bool _isWorking = false;
std::string _sql = "";
void handleRead();
void pgPoll();
void handleClosed();

View File

@ -6,13 +6,14 @@ using namespace drogon::orm;
int main()
{
auto clientPtr=DbClient::newPgClient("host=127.0.0.1 port=5432 dbname=test user=antao", 1);
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
auto clientPtr = DbClient::newPgClient("host=127.0.0.1 port=5432 dbname=test user=antao", 3);
LOG_DEBUG << "start!";
sleep(1);
*clientPtr << "update group_users set join_date=$1,relationship=$2 where g_uuid=420040 and u_uuid=2"
<< nullptr
<< nullptr
<< Mode::Blocking >>
<< nullptr
<< nullptr
<< Mode::Blocking >>
[](const Result &r) {
std::cout << "update " << r.affectedRows() << " lines" << std::endl;
} >>
@ -35,7 +36,6 @@ int main()
LOG_DEBUG << "catch:" << e.base().what();
}
// client << "select count(*) from users" >> [](const drogon::orm::Result &r) {
// for (auto row : r)
// {
@ -60,7 +60,7 @@ int main()
// LOG_DEBUG << "except callback:" << e.base().what();
// };
// client << "select user_id,user_uuid from users where user_uuid=$1"
// client << "select user_id,user_uuid from users where user_uuid=$1"
// << 2
// >> [](bool isNull, const std::string &id, uint64_t uuid) {
// if (!isNull)
@ -84,10 +84,10 @@ int main()
auto f = clientPtr->execSqlAsync("select * from users limit 5");
try
{
auto r=f.get();
for(auto row:r)
auto r = f.get();
for (auto row : r)
{
std::cout<<"user_id:"<<row["user_id"].as<std::string>()<<std::endl;
std::cout << "user_id:" << row["user_id"].as<std::string>() << std::endl;
}
}
catch (const drogon::orm::DrogonDbException &e)