Add the MysqlConnection class

This commit is contained in:
antao 2018-11-22 14:55:05 +08:00
parent 21cb1b482a
commit efbf5b4ad5
11 changed files with 280 additions and 17 deletions

View File

@ -167,6 +167,9 @@ if (MAKETEST STREQUAL YES)
if(PostgreSQL_FOUND)
add_subdirectory(${PROJECT_SOURCE_DIR}/orm_lib/src/postgresql_impl/test)
endif()
if(MYSQL_FOUND)
add_subdirectory(${PROJECT_SOURCE_DIR}/orm_lib/src/mysql_impl/test)
endif()
if(USE_ORM)
add_subdirectory(${PROJECT_SOURCE_DIR}/orm_lib/tests)
endif()

View File

@ -65,7 +65,10 @@ class DbClient : public trantor::NonCopyable
#if USE_POSTGRESQL
static std::shared_ptr<DbClient> newPgClient(const std::string &connInfo, const size_t connNum);
#endif
///Async method, nonblocking by default;
#if USE_MYSQL
static std::shared_ptr<DbClient> newMysqlClient(const std::string &connInfo, const size_t connNum);
#endif
//Async method, nonblocking by default;
template <
typename FUNCTION1,
typename FUNCTION2,

View File

@ -29,3 +29,10 @@ std::shared_ptr<DbClient> DbClient::newPgClient(const std::string &connInfo, con
return std::make_shared<DbClientImpl>(connInfo, connNum, ClientType::PostgreSQL);
}
#endif
#if USE_MYSQL
std::shared_ptr<DbClient> DbClient::newMysqlClient(const std::string &connInfo, const size_t connNum)
{
return std::make_shared<DbClientImpl>(connInfo, connNum, ClientType::Mysql);
}
#endif

View File

@ -6,6 +6,9 @@
#if USE_POSTGRESQL
#include "postgresql_impl/PgConnection.h"
#endif
#if USE_MYSQL
#include "mysql_impl/MysqlConnection.h"
#endif
#include "TransactionImpl.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
@ -264,6 +267,14 @@ DbConnectionPtr DbClientImpl::newConnection()
connPtr = std::make_shared<PgConnection>(_loopPtr.get(), _connInfo);
#else
return nullptr;
#endif
}
else if(_type == ClientType::Mysql)
{
#if USE_MYSQL
connPtr = std::make_shared<MysqlConnection>(_loopPtr.get(), _connInfo);
#else
return nullptr;
#endif
}
else

View File

@ -0,0 +1,163 @@
/**
*
* MysqlConnection.cc
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#include "MysqlConnection.h"
using namespace drogon::orm;
MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo)
: DbConnection(loop)
{
_mysqlPtr = std::shared_ptr<MYSQL>(new MYSQL, [](MYSQL *p) {
mysql_close(p);
});
mysql_init(_mysqlPtr.get());
mysql_options(_mysqlPtr.get(), MYSQL_OPT_NONBLOCK, 0);
MYSQL *ret;
int status = mysql_real_connect_start(&ret, _mysqlPtr.get(), "127.0.0.1", "root", "", "test", 3306, NULL, 0);
auto fd = mysql_get_socket(_mysqlPtr.get());
_channelPtr = std::unique_ptr<trantor::Channel>(new trantor::Channel(loop, fd));
_channelPtr->setReadCallback([=]() {
handleRead();
});
_channelPtr->setWriteCallback([=]() {
handleWrite();
});
_channelPtr->setCloseCallback([=]() {
perror("sock close");
handleClosed();
});
_channelPtr->setErrorCallback([=]() {
perror("sock err");
handleClosed();
});
// LOG_TRACE << "channel index:" << _channelPtr->index();
// LOG_TRACE << "channel " << this << " fd:" << _channelPtr->fd();
_channelPtr->enableReading();
setChannel(status);
}
void MysqlConnection::setChannel(int status)
{
LOG_TRACE << "channel index:" << _channelPtr->index();
_channelPtr->disableReading();
_channelPtr->disableWriting();
if (status & MYSQL_WAIT_READ)
{
_channelPtr->enableReading();
}
if (status & MYSQL_WAIT_WRITE)
{
_channelPtr->enableWriting();
}
//(status & MYSQL_WAIT_EXCEPT) ///FIXME
if (status & MYSQL_WAIT_TIMEOUT)
{
auto timeout = mysql_get_timeout_value(_mysqlPtr.get());
auto thisPtr = shared_from_this();
_loop->runAfter(timeout, [thisPtr]() {
thisPtr->handleTimeout();
});
}
}
void MysqlConnection::handleRead()
{
LOG_TRACE << "channel index:" << _channelPtr->index();
int status = 0;
status |= MYSQL_WAIT_READ;
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
{
if (!ret)
{
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
}
}
}
void MysqlConnection::handleWrite()
{
LOG_TRACE << "channel index:" << _channelPtr->index();
int status = 0;
status |= MYSQL_WAIT_WRITE;
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
{
if (!ret)
{
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
}
}
}
void MysqlConnection::handleClosed()
{
_status = ConnectStatus_Bad;
_loop->assertInLoopThread();
_channelPtr->disableAll();
_channelPtr->remove();
assert(_closeCb);
auto thisPtr = shared_from_this();
_closeCb(thisPtr);
}
void MysqlConnection::handleTimeout()
{
LOG_TRACE << "channel index:" << _channelPtr->index();
int status = 0;
status |= MYSQL_WAIT_TIMEOUT;
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
{
if (!ret)
{
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
}
}
}

View File

@ -0,0 +1,59 @@
/**
*
* MysqlConnection.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#pragma once
#include "../DbConnection.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/orm/DbClient.h>
#include <trantor/utils/NonCopyable.h>
#include <mysql.h>
#include <memory>
#include <string>
#include <functional>
#include <iostream>
namespace drogon
{
namespace orm
{
class MysqlConnection;
typedef std::shared_ptr<MysqlConnection> MysqlConnectionPtr;
class MysqlConnection : public DbConnection, public std::enable_shared_from_this<MysqlConnection>
{
public:
MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo);
~MysqlConnection() { }
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) override {}
private:
std::unique_ptr<trantor::Channel> _channelPtr;
std::shared_ptr<MYSQL> _mysqlPtr;
void handleRead();
void handleTimeout();
void handleWrite();
void handleClosed();
void setChannel(int status);
};
} // namespace orm
} // namespace drogon

View File

View File

@ -0,0 +1,4 @@
link_libraries(drogon trantor pthread dl)
add_executable(mytest1 test1.cc)
#add_executable(test2 test2.cc)

View File

@ -0,0 +1,12 @@
#include <drogon/orm/DbClient.h>
#include <trantor/utils/Logger.h>
#include <iostream>
#include <unistd.h>
using namespace drogon::orm;
int main()
{
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
auto clientPtr = DbClient::newMysqlClient("host=127.0.0.1 port=5432 dbname=test user=antao", 3);
getchar();
}

View File

@ -20,23 +20,24 @@ class PgConnection;
typedef std::shared_ptr<PgConnection> PgConnectionPtr;
class PgConnection : public DbConnection, public std::enable_shared_from_this<PgConnection>
{
public:
PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
public:
PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
private:
std::shared_ptr<PGconn> _connPtr;
trantor::Channel _channel;
void handleRead();
void pgPoll();
void handleClosed();
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb) override;
private:
std::shared_ptr<PGconn> _connPtr;
trantor::Channel _channel;
void handleRead();
void pgPoll();
void handleClosed();
};
} // namespace orm