Add Transaction

This commit is contained in:
antao 2018-11-15 13:49:47 +08:00
parent b61e94548e
commit 2e35a966d3
9 changed files with 511 additions and 44 deletions

View File

@ -34,6 +34,8 @@ namespace orm
typedef std::function<void(const Result &)> ResultCallback;
typedef std::function<void(const DrogonDbException &)> ExceptionCallback;
class Transaction;
class DbClient : public trantor::NonCopyable
{
public:
@ -99,6 +101,8 @@ class DbClient : public trantor::NonCopyable
}
internal::SqlBinder operator<<(const std::string &sql);
virtual std::shared_ptr<Transaction> newTransaction() = 0;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const = 0;
private:
@ -113,5 +117,12 @@ class DbClient : public trantor::NonCopyable
};
typedef std::shared_ptr<DbClient> DbClientPtr;
class Transaction : public DbClient
{
public:
virtual void rollback() = 0;
//virtual void commit() = 0;
};
} // namespace orm
} // namespace drogon

View File

@ -0,0 +1,23 @@
/**
*
* Transaction.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
*
*/
#pragma once
#include <drogon/orm/DbClient.h>
namespace drogon
{
namespace orm
{
} // namespace orm
} // namespace drogon

View File

@ -2,7 +2,8 @@
// Created by antao on 2018/6/22.
//
#include "PgClientImpl.h"
#include "DbConnection.h"
#include "PgTransactionImpl.h"
#include "PgConnection.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/orm/Exception.h>
@ -20,11 +21,11 @@
using namespace drogon::orm;
DbConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop)
PgConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop)
{
//std::cout<<"newConn"<<std::endl;
auto connPtr = std::make_shared<DbConnection>(loop, _connInfo);
connPtr->setCloseCallback([=](const DbConnectionPtr &closeConnPtr) {
auto connPtr = std::make_shared<PgConnection>(loop, _connInfo);
connPtr->setCloseCallback([=](const PgConnectionPtr &closeConnPtr) {
//std::cout<<"Conn closed!"<<closeConnPtr<<std::endl;
sleep(1);
std::lock_guard<std::mutex> guard(_connectionsMutex);
@ -35,10 +36,14 @@ DbConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop)
_connections.insert(newConnection(loop));
//std::cout<<"Conn closed!end"<<std::endl;
});
connPtr->setOkCallback([=](const DbConnectionPtr &okConnPtr) {
connPtr->setOkCallback([=](const PgConnectionPtr &okConnPtr) {
LOG_TRACE << "postgreSQL connected!";
std::lock_guard<std::mutex> guard(_connectionsMutex);
_readyConnections.insert(okConnPtr);
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
_readyConnections.insert(okConnPtr);
}
_condConnectionReady.notify_one();
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
@ -71,14 +76,14 @@ PgClientImpl::~PgClientImpl()
_loopThread.join();
}
void PgClientImpl::execSql(const DbConnectionPtr &conn,
void PgClientImpl::execSql(const PgConnectionPtr &conn,
const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &cb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
if (!conn)
{
@ -92,7 +97,7 @@ void PgClientImpl::execSql(const DbConnectionPtr &conn,
}
return;
}
std::weak_ptr<DbConnection> weakConn = conn;
std::weak_ptr<PgConnection> weakConn = conn;
conn->execSql(sql, paraNum, parameters, length, format,
cb, exceptCallback,
[=]() -> void {
@ -120,10 +125,12 @@ void PgClientImpl::execSql(const DbConnectionPtr &conn,
return;
}
}
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
}
_condConnectionReady.notify_one();
}
});
}
@ -139,7 +146,7 @@ void PgClientImpl::execSql(const std::string &sql,
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(cb);
DbConnectionPtr conn;
PgConnectionPtr conn;
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
@ -230,3 +237,49 @@ std::string PgClientImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const
startPos = pos + holderStr.length();
} while (1);
}
std::shared_ptr<Transaction> PgClientImpl::newTransaction()
{
PgConnectionPtr conn;
{
std::unique_lock<std::mutex> lock(_connectionsMutex);
_condConnectionReady.wait(lock, [this]() {
return _readyConnections.size() > 0;
});
auto iter = _readyConnections.begin();
_busyConnections.insert(*iter);
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<PgTransactionImpl>(new PgTransactionImpl(conn, [=]() {
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 0)
{
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() {
std::vector<const char *> paras;
std::vector<int> lens;
for (auto &p : cmd._parameters)
{
paras.push_back(p.c_str());
lens.push_back(p.length());
}
execSql(conn, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb);
});
return;
}
}
{
std::lock_guard<std::mutex> lock(_connectionsMutex);
_readyConnections.insert(conn);
}
_condConnectionReady.notify_one();
}));
trans->doBegin();
return trans;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include "DbConnection.h"
#include "PgConnection.h"
#include <drogon/orm/DbClient.h>
#include <trantor/net/EventLoop.h>
#include <memory>
@ -30,6 +30,7 @@ class PgClientImpl : public DbClient
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const override;
virtual std::shared_ptr<Transaction> newTransaction() override;
private:
void ioLoop();
@ -42,7 +43,7 @@ class PgClientImpl : public DbClient
ConnectStatus_Bad
};
void execSql(const DbConnectionPtr &conn, const std::string &sql,
void execSql(const PgConnectionPtr &conn, const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
@ -50,13 +51,14 @@ class PgClientImpl : public DbClient
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
DbConnectionPtr newConnection(trantor::EventLoop *loop);
std::unordered_set<DbConnectionPtr> _connections;
std::unordered_set<DbConnectionPtr> _readyConnections;
std::unordered_set<DbConnectionPtr> _busyConnections;
PgConnectionPtr newConnection(trantor::EventLoop *loop);
std::unordered_set<PgConnectionPtr> _connections;
std::unordered_set<PgConnectionPtr> _readyConnections;
std::unordered_set<PgConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady;
size_t _connectNum;
bool _stop = false;

View File

@ -1,4 +1,4 @@
#include "DbConnection.h"
#include "PgConnection.h"
#include "PostgreSQLResultImpl.h"
#include <drogon/orm/Exception.h>
#include <stdio.h>
@ -17,7 +17,7 @@ Result makeResult(const std::shared_ptr<PGresult> &r = std::shared_ptr<PGresult>
} // namespace orm
} // namespace drogon
DbConnection::DbConnection(trantor::EventLoop *loop, const std::string &connInfo)
PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo)
: _connPtr(std::shared_ptr<PGconn>(PQconnectStart(connInfo.c_str()), [](PGconn *conn) {
PQfinish(conn);
})),
@ -57,11 +57,11 @@ DbConnection::DbConnection(trantor::EventLoop *loop, const std::string &connInfo
_channel.enableReading();
_channel.enableWriting();
}
int DbConnection::sock()
int PgConnection::sock()
{
return PQsocket(_connPtr.get());
}
void DbConnection::handleClosed()
void PgConnection::handleClosed()
{
std::cout << "handleClosed!" << this << std::endl;
_loop->assertInLoopThread();
@ -71,7 +71,7 @@ void DbConnection::handleClosed()
auto thisPtr = shared_from_this();
_closeCb(thisPtr);
}
void DbConnection::pgPoll()
void PgConnection::pgPoll()
{
_loop->assertInLoopThread();
auto connStatus = PQconnectPoll(_connPtr.get());
@ -121,7 +121,7 @@ void DbConnection::pgPoll()
break;
}
}
void DbConnection::execSql(const std::string &sql,
void PgConnection::execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
@ -155,14 +155,32 @@ void DbConnection::execSql(const std::string &sql,
0) == 0)
{
fprintf(stderr, "send query error:%s\n", PQerrorMessage(_connPtr.get()));
//FIXME call exception callback
// connection broken! will be handled in handleRead()
// _loop->queueInLoop([=]() {
// try
// {
// throw InternalError(PQerrorMessage(_connPtr.get()));
// }
// catch (...)
// {
// _isWorking = false;
// _exceptCb(std::current_exception());
// _exceptCb = decltype(_exceptCb)();
// if (_idleCb)
// {
// _idleCb();
// _idleCb = decltype(_idleCb)();
// }
// }
// });
// return;
}
auto thisPtr = shared_from_this();
_loop->runInLoop([=]() {
thisPtr->pgPoll();
});
}
void DbConnection::handleRead()
void PgConnection::handleRead()
{
std::shared_ptr<PGresult> res;
@ -185,6 +203,11 @@ void DbConnection::handleRead()
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCb)
{
_idleCb();
_idleCb = decltype(_idleCb)();
}
}
handleClosed();
return;
@ -239,6 +262,10 @@ void DbConnection::handleRead()
if (_isWorking)
{
_isWorking = false;
_idleCb();
if (_idleCb)
{
_idleCb();
_idleCb = decltype(_idleCb)();
}
}
}

View File

@ -23,19 +23,19 @@ enum ConnectStatus
ConnectStatus_Bad
};
class DbConnection;
typedef std::shared_ptr<DbConnection> DbConnectionPtr;
class DbConnection : public trantor::NonCopyable, public std::enable_shared_from_this<DbConnection>
class PgConnection;
typedef std::shared_ptr<PgConnection> PgConnectionPtr;
class PgConnection : public trantor::NonCopyable, public std::enable_shared_from_this<PgConnection>
{
public:
typedef std::function<void(const DbConnectionPtr &)> DbConnectionCallback;
DbConnection(trantor::EventLoop *loop, const std::string &connInfo);
typedef std::function<void(const PgConnectionPtr &)> PgConnectionCallback;
PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
void setOkCallback(const DbConnectionCallback &cb)
void setOkCallback(const PgConnectionCallback &cb)
{
_okCb = cb;
}
void setCloseCallback(const DbConnectionCallback &cb)
void setCloseCallback(const PgConnectionCallback &cb)
{
_closeCb = cb;
}
@ -47,11 +47,12 @@ class DbConnection : public trantor::NonCopyable, public std::enable_shared_from
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
~DbConnection()
~PgConnection()
{
//std::cout<<"unconstruct DbConn"<<this<<std::endl;
LOG_TRACE<<"Destruct DbConn"<<this;
}
int sock();
trantor::EventLoop *loop() { return _loop; }
private:
std::shared_ptr<PGconn> _connPtr;
@ -60,8 +61,8 @@ class DbConnection : public trantor::NonCopyable, public std::enable_shared_from
QueryCallback _cb;
std::function<void()> _idleCb;
ConnectStatus _status = ConnectStatus_None;
DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {};
DbConnectionCallback _okCb = [](const DbConnectionPtr &) {};
PgConnectionCallback _closeCb = [](const PgConnectionPtr &) {};
PgConnectionCallback _okCb = [](const PgConnectionPtr &) {};
std::function<void(const std::exception_ptr &)> _exceptCb;
bool _isWorking = false;
std::string _sql = "";

View File

@ -0,0 +1,280 @@
/**
*
* PgTransactionImpl.cc
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
*
*/
#include "PgTransactionImpl.h"
#include <trantor/utils/Logger.h>
using namespace drogon::orm;
PgTransactionImpl::PgTransactionImpl(const PgConnectionPtr &connPtr,
const std::function<void()> &usedUpCallback)
: _connectionPtr(connPtr),
_usedUpCallback(usedUpCallback),
_loop(connPtr->loop())
{
}
PgTransactionImpl::~PgTransactionImpl()
{
if (!_isCommitedOrRolledback)
{
auto cb = _usedUpCallback;
auto loop = _connectionPtr->loop();
auto conn = _connectionPtr;
loop->queueInLoop([conn, cb]() {
conn->execSql("commit",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[](const Result &r) {
LOG_TRACE << "Transaction commited!";
},
[](const std::exception_ptr &ePtr) {
},
[cb]() {
if (cb)
{
cb();
}
});
});
}
}
void PgTransactionImpl::execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql, paraNum, parameters, length, format, rcb, exceptCallback]() {
if (!thisPtr->_isCommitedOrRolledback)
{
if (!thisPtr->_isWorking)
{
thisPtr->_isWorking = true;
thisPtr->_connectionPtr->execSql(sql,
paraNum,
parameters,
length,
format,
rcb,
[exceptCallback, thisPtr](const std::exception_ptr &ePtr) {
exceptCallback(ePtr);
thisPtr->rollback();
},
[thisPtr]() {
thisPtr->execNewTask();
});
}
else
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = sql;
cmd._paraNum = paraNum;
for (size_t i = 0; i < parameters.size(); i++)
{
//LOG_TRACE << "parameters[" << i << "]=" << (size_t)(parameters[i]);
cmd._parameters.push_back(std::string(parameters[i], length[i]));
}
cmd._format = format;
cmd._cb = rcb;
cmd._exceptCb = exceptCallback;
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
}
}
else
{
//The transaction has been rolled back;
try
{
throw TransactionRollback("The transaction has been rolled back");
}
catch (...)
{
exceptCallback(std::current_exception());
}
}
});
}
void PgTransactionImpl::rollback()
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr]() {
if (thisPtr->_isCommitedOrRolledback)
return;
auto clearupCb = [thisPtr]() {
thisPtr->_isCommitedOrRolledback = true;
if (thisPtr->_usedUpCallback)
{
thisPtr->_usedUpCallback();
thisPtr->_usedUpCallback = decltype(thisPtr->_usedUpCallback)();
}
};
if (thisPtr->_isWorking)
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = "rollback";
cmd._paraNum = 0;
cmd._cb = [clearupCb](const Result &r) {
LOG_TRACE << "Transaction roll back!";
clearupCb();
};
cmd._exceptCb = [clearupCb](const std::exception_ptr &ePtr) {
clearupCb();
};
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
return;
}
thisPtr->_isWorking = true;
thisPtr
->_connectionPtr
->execSql("rollback",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[clearupCb](const Result &r) {
LOG_TRACE << "Transaction roll back!";
clearupCb();
},
[clearupCb](const std::exception_ptr &ePtr) {
clearupCb();
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
}
void PgTransactionImpl::execNewTask()
{
_loop->assertInLoopThread();
assert(_isWorking);
if (!_isCommitedOrRolledback)
{
auto thisPtr = shared_from_this();
if (_sqlCmdBuffer.size() > 0)
{
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
std::vector<const char *> paras;
std::vector<int> lens;
for (auto &p : cmd._parameters)
{
paras.push_back(p.c_str());
lens.push_back(p.length());
}
auto conn = _connectionPtr;
_loop->queueInLoop([=]() {
conn->execSql(cmd._sql,
cmd._paraNum,
paras,
lens,
cmd._format,
cmd._cb,
[cmd, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (cmd._exceptCb)
cmd._exceptCb(ePtr);
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
return;
}
_isWorking = false;
}
else
{
_isWorking = false;
if (_sqlCmdBuffer.size() > 0)
{
try
{
throw TransactionRollback("The transaction has been rolled back");
}
catch (...)
{
for (auto &cmd : _sqlCmdBuffer)
{
if (cmd._exceptCb)
{
cmd._exceptCb(std::current_exception());
}
}
}
_sqlCmdBuffer.clear();
}
}
}
void PgTransactionImpl::doBegin()
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr]() {
assert(!thisPtr->_isWorking);
assert(!thisPtr->_isCommitedOrRolledback);
thisPtr->_isWorking = true;
thisPtr->_connectionPtr->execSql("begin",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[](const Result &r) {
LOG_TRACE << "Transaction begin!";
},
[thisPtr](const std::exception_ptr &ePtr) {
thisPtr->_isCommitedOrRolledback = true;
if (thisPtr->_usedUpCallback)
{
thisPtr->_usedUpCallback();
}
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
}
std::string PgTransactionImpl::replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const
{
std::string::size_type startPos = 0;
std::string::size_type pos;
std::stringstream ret;
size_t phCount = 1;
do
{
pos = sqlStr.find(holderStr, startPos);
if (pos == std::string::npos)
{
ret << sqlStr.substr(startPos);
return ret.str();
}
ret << sqlStr.substr(startPos, pos - startPos);
ret << "$";
ret << phCount++;
startPos = pos + holderStr.length();
} while (1);
}

View File

@ -0,0 +1,66 @@
/**
*
* PgTransactionImpl.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
*
*/
#pragma once
#include "PgConnection.h"
#include <drogon/orm/DbClient.h>
#include <functional>
#include <list>
namespace drogon
{
namespace orm
{
class PgTransactionImpl : public Transaction, public std::enable_shared_from_this<PgTransactionImpl>
{
public:
PgTransactionImpl(const PgConnectionPtr &connPtr, const std::function<void()> &usedUpCallback);
~PgTransactionImpl();
void rollback() override;
virtual std::string replaceSqlPlaceHolder(const std::string &sqlStr, const std::string &holderStr) const override;
private:
PgConnectionPtr _connectionPtr;
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction() override
{
return shared_from_this();
}
std::function<void()> _usedUpCallback;
bool _isCommitedOrRolledback = false;
bool _isWorking = false;
void execNewTask();
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<std::string> _parameters;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::list<SqlCmd> _sqlCmdBuffer;
// std::mutex _bufferMutex;
friend class PgClientImpl;
void doBegin();
trantor::EventLoop *_loop;
};
} // namespace orm
} // namespace drogon

View File

@ -29,10 +29,14 @@ const std::string User::tableName = "users";
int main()
{
auto client=DbClient::newPgClient("host=127.0.0.1 port=5432 dbname=test user=antao", 1);
LOG_DEBUG << "start!";
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
auto client = DbClient::newPgClient("host=127.0.0.1 port=5432 dbname=test user=antao", 1);
sleep(1);
//client << "\\d users" >> [](const Result &r) {} >> [](const DrogonDbException &e) { std::cerr << e.base().what() << std::endl; };
LOG_DEBUG << "start!";
{
auto trans = client->newTransaction();
//trans->rollback();
}
Mapper<User> mapper(client);
auto U = mapper.findByPrimaryKey(2);
std::cout << "id=" << U._userId << std::endl;