add CacheMap class,need testing

This commit is contained in:
an-tao 2018-05-03 18:28:37 +08:00
parent 3746b57bee
commit 87592fe7da
5 changed files with 225 additions and 8 deletions

View File

@ -6,8 +6,10 @@ project (DROGON CXX)
EXEC_PROGRAM (gcc ARGS "--version | grep '^gcc'|awk '{print $3}' | sed s'/)//g'" OUTPUT_VARIABLE version)
MESSAGE(STATUS "This is gcc version:: " ${version})
if (version LESS 6.0.0)
if(version LESS 4.7.0)
MESSAGE(STATUS "gcc is too old")
stop()
elseif (version LESS 6.1.0)
MESSAGE(STATUS "c++11")
set(CMAKE_CXX_STD_FLAGS c++11)
elseif(version LESS 7.1.0)
@ -15,14 +17,11 @@ elseif(version LESS 7.1.0)
MESSAGE(STATUS "c++14")
else()
set(CMAKE_CXX_STD_FLAGS c++17)
add_definitions(-DUSE_C++17)
add_definitions(-DUSE_STD_ANY)
MESSAGE(STATUS "c++17")
endif()
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -std=${CMAKE_CXX_STD_FLAGS} -fpermissive -g -ggdb)
string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}")
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake_modules/)
@ -36,10 +35,18 @@ if(UUID_FOUND)
endif()
find_package(Boost)
if(BOOST_FOUND)
if(Boost_FOUND)
add_definitions(-DUSE_BOOST)
include_directories(${Boost_INCLUDE_DIRS})
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=${CMAKE_CXX_STD_FLAGS} -fpermissive -g -ggdb")
#string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}")
MESSAGE(STATUS ${CMAKE_CXX_FLAGS})
add_subdirectory(trantor)
include_directories(${PROJECT_SOURCE_DIR}/trantor ${PROJECT_SOURCE_DIR}/lib/inc)

View File

@ -2,5 +2,6 @@
int main()
{
drogon::HttpAppFramework framework("0.0.0.0",12345);
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
framework.run();
}

199
lib/inc/drogon/CacheMap.h Executable file
View File

@ -0,0 +1,199 @@
//
// Copyright 2018, An Tao. All rights reserved.
//
// Use of this source code is governed by a MIT license
// that can be found in the License file.
#pragma once
#include <map>
#include <trantor/net/EventLoop.h>
#include <mutex>
#include <deque>
#include <vector>
#include <set>
class CallbackEntry
{
public:
CallbackEntry(std::function<void()> cb):cb_(std::move(cb)){}
~CallbackEntry()
{
cb_();
}
private:
std::function<void()> cb_;
};
typedef std::shared_ptr<CallbackEntry> CallbackEntryPtr;
typedef std::weak_ptr<CallbackEntry> WeakCallbackEntryPtr;
typedef std::set<CallbackEntryPtr> CallbackBucket;
typedef std::deque<CallbackBucket> CallbackBucketQueue;
template <typename T1,typename T2>
class CacheMap
{
public:
CacheMap(trantor::EventLoop *loop,int interval,int limit)
:timeInterval_(interval),
_limit(limit),
_loop(loop)
{
bucketCount_=limit/interval+1;
event_bucket_queue_.resize(bucketCount_);
_loop->runEvery(interval, [=](){
CallbackBucket tmp;
{
std::lock_guard<std::mutex> lock(bucketMutex_);
//use tmp to make this critical area as short as possible.
event_bucket_queue_.front().swap(tmp);
event_bucket_queue_.pop_front();
event_bucket_queue_.push_back(CallbackBucket());
}
});
};
typedef struct MapValue
{
int timeout=0;
T2 value;
std::function<void()> _timeoutCallback;
}MapValue;
void insert(const T1& key,const T2& value,int timeout=0,std::function<void()> timeoutCallback=std::function<void()>())
{
if(timeout>0)
{
{
std::lock_guard<std::mutex> lock(mtx_);
map_[key].value=value;
map_[key].timeout=timeout;
map_[key]._timeoutCallback=std::move(timeoutCallback);
}
eraseAfter(timeout,key);
}
else
{
std::lock_guard<std::mutex> lock(mtx_);
map_[key].value=value;
map_[key].timeout=timeout;
}
}
T2& operator [](const T1& key){
int timeout=0;
std::lock_guard<std::mutex> lock(mtx_);
if(map_.find(key)!=map_.end())
{
timeout=map_[key].timeout;
}
if(timeout)
eraseAfter(timeout,key);
return map_[key].value;
}
bool find(const T1& key)
{
int timeout=0;
bool flag=false;
std::lock_guard<std::mutex> lock(mtx_);
if(map_.find(key)!=map_.end())
{
timeout=map_[key].timeout;
flag=true;
}
if(timeout)
eraseAfter(timeout,key);
return flag;
}
void erase(const T1& key)
{
//in this case,we don't evoke the timeout callback;
{
std::lock_guard<std::mutex> lock(mtx_);
map_.erase(key);
}
{
std::lock_guard<std::mutex> lock(weakPtrMutex_);
weak_entry_map_.erase(key);
}
}
protected:
std::map< T1,MapValue > map_;
CallbackBucketQueue event_bucket_queue_;
std::map< T1, WeakCallbackEntryPtr > weak_entry_map_;
std::mutex mtx_;
std::mutex weakPtrMutex_;
std::mutex bucketMutex_;
int bucketCount_;
int timeInterval_;
int _limit;
trantor::EventLoop* _loop;
void eraseAfter(int delay,const T1& key)
{
uint32_t bucketIndexToPush;
uint32_t bucketNum = uint32_t(delay / timeInterval_) + 1;
uint32_t queue_size = event_bucket_queue_.size();
if (bucketNum >= queue_size)
{
bucketIndexToPush = queue_size - 1;
}
else
{
bucketIndexToPush = bucketNum;
}
CallbackEntryPtr entryPtr;
{
std::lock_guard<std::mutex> lock(weakPtrMutex_);
if(weak_entry_map_.find(key)!=weak_entry_map_.end())
{
entryPtr=weak_entry_map_[key].lock();
}
}
if(entryPtr)
{
std::lock_guard<std::mutex> lock(bucketMutex_);
event_bucket_queue_[bucketIndexToPush].insert(entryPtr);
}
else
{
std::function<void ()>cb=[=](){
std::lock_guard<std::mutex> lock(mtx_);
std::lock_guard<std::mutex> lock1(weakPtrMutex_);
WeakCallbackEntryPtr tmpWeakPtr;
if(weak_entry_map_.find(key)!=weak_entry_map_.end())
{
tmpWeakPtr=weak_entry_map_[key];
if(!tmpWeakPtr.lock())
{
weak_entry_map_.erase(key);
if(map_.find(key)!=map_.end())
{
if(map_[key]._timeoutCallback)
{
map_[key]._timeoutCallback();
}
map_.erase(key);
}
}
}
};
entryPtr=std::make_shared<CallbackEntry>(cb);
{
std::lock_guard<std::mutex> lock(weakPtrMutex_);
weak_entry_map_[key] = WeakCallbackEntryPtr(entryPtr);
}
{
std::lock_guard<std::mutex> lock(bucketMutex_);
event_bucket_queue_[bucketIndexToPush].insert(entryPtr);
}
}
}
};

View File

@ -8,6 +8,7 @@
#include <drogon/HttpRequest.h>
#include <drogon/HttpResponse.h>
#include <drogon/CacheMap.h>
#include <trantor/utils/NonCopyable.h>
#include <string>
#include <set>

View File

@ -66,6 +66,15 @@ void HttpServer::onConnection(const TcpConnectionPtr& conn)
if (conn->connected()) {
conn->setContext(new HttpContext());
}
else if(conn->disconnected())
{
HttpContext* context = (HttpContext*)(conn->getContext());
if(context)
{
delete context;
}
LOG_TRACE<<"conn disconnected!";
}
}
void HttpServer::onMessage(const TcpConnectionPtr& conn,