Use multiple wheels in CacheMap

This commit is contained in:
antao 2018-10-07 22:59:22 +08:00
parent a05b846fe3
commit c0ad2c3470
4 changed files with 94 additions and 41 deletions

View File

@ -22,7 +22,7 @@ else()
set(CMAKE_CXX_STD_FLAGS c++11)
endif()
include_directories(${PROJECT_SOURCE_DIR}/trantor ${PROJECT_SOURCE_DIR}/lib/inc)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake_modules/)
#jsoncpp
@ -66,7 +66,6 @@ set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -std=${CMAKE_CXX_STD_F
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall -std=${CMAKE_CXX_STD_FLAGS}")
add_subdirectory(trantor)
include_directories(${PROJECT_SOURCE_DIR}/trantor ${PROJECT_SOURCE_DIR}/lib/inc)
add_custom_target(makeVersion)
add_custom_command(TARGET makeVersion

View File

@ -17,6 +17,7 @@
#include <trantor/net/EventLoop.h>
#include <trantor/utils/Logger.h>
#include <map>
#include <mutex>
#include <deque>
@ -25,6 +26,13 @@
#include <unordered_map>
#include <unordered_set>
#include <assert.h>
#define WHEELS_NUM 4
#define BUCKET_NUM_PER_WHEEL 100
#define TICK_INTERVAL 1.0
namespace drogon
{
@ -54,25 +62,43 @@ public:
/// constructor
/// @param loop
/// eventloop pointer
/// @param interval
/// timer stepseconds
/// @param limit
/// tht max timeout value of the cache (seconds)
CacheMap(trantor::EventLoop *loop,int interval,int limit)
:timeInterval_(interval),
_limit(limit),
_loop(loop)
/// @param tickInterval
/// second
/// @param wheelsNum
/// number of wheels
/// @param bucketsNumPerWheel
/// buckets number per wheel
CacheMap(trantor::EventLoop *loop,
float tickInterval=TICK_INTERVAL,
size_t wheelsNum=WHEELS_NUM,
size_t bucketsNumPerWheel=BUCKET_NUM_PER_WHEEL)
:_loop(loop),
_tickInterval(tickInterval),
_wheelsNum(wheelsNum),
_bucketsNumPerWheel(bucketsNumPerWheel)
{
bucketCount_=limit/interval+1;
event_bucket_queue_.resize(bucketCount_);
_timerId=_loop->runEvery(interval, [=](){
CallbackBucket tmp;
_wheels.resize(_wheelsNum);
for(int i=0;i<_wheelsNum;i++)
{
_wheels[i].resize(_bucketsNumPerWheel);
}
_timerId=_loop->runEvery(_tickInterval, [=](){
_ticksCounter++;
size_t pow=1;
for(int i=0;i<_wheelsNum;i++)
{
std::lock_guard<std::mutex> lock(bucketMutex_);
//use tmp val to make this critical area as short as possible.
event_bucket_queue_.front().swap(tmp);
event_bucket_queue_.pop_front();
event_bucket_queue_.push_back(CallbackBucket());
if((_ticksCounter%pow)==0)
{
CallbackBucket tmp;
{
std::lock_guard<std::mutex> lock(bucketMutex_);
//use tmp val to make this critical area as short as possible.
_wheels[i].front().swap(tmp);
_wheels[i].pop_front();
_wheels[i].push_back(CallbackBucket());
}
}
pow=pow*_bucketsNumPerWheel;
}
});
};
@ -153,33 +179,55 @@ public:
private:
std::unordered_map< T1,MapValue > _map;
CallbackBucketQueue event_bucket_queue_;
std::vector<CallbackBucketQueue> _wheels;
size_t _ticksCounter=0;
std::mutex mtx_;
std::mutex bucketMutex_;
int bucketCount_;
int timeInterval_;
int _limit;
trantor::TimerId _timerId;
trantor::EventLoop* _loop;
float _tickInterval;
size_t _wheelsNum;
size_t _bucketsNumPerWheel;
void inertEntry(int delay,CallbackEntryPtr entryPtr)
{
//protected by bucketMutex;
if(delay<=0)
return;
for(int i=0;i<_wheelsNum;i++)
{
if(delay<=_bucketsNumPerWheel)
{
_wheels[i][delay-1].insert(entryPtr);
break;
}
if(i<(_wheelsNum-1))
{
entryPtr=std::make_shared<CallbackEntry>([=](){
if(delay>0)
{
std::lock_guard<std::mutex> lock(bucketMutex_);
_wheels[i][(delay-1)%_bucketsNumPerWheel].insert(entryPtr);
}
});
}
else
{
//delay is too long to put entry at valid position in wheels;
_wheels[i][_bucketsNumPerWheel-1].insert(entryPtr);
}
delay=(delay-1)/_bucketsNumPerWheel;
}
}
void eraseAfter(int delay,const T1& key)
{
assert(_map.find(key)!=_map.end());
size_t bucketIndexToPush;
size_t bucketNum = size_t(delay / timeInterval_) + 1;
size_t queue_size = event_bucket_queue_.size();
if (bucketNum >= queue_size)
{
bucketIndexToPush = queue_size - 1;
}
else
{
bucketIndexToPush = bucketNum;
}
CallbackEntryPtr entryPtr;
@ -191,7 +239,7 @@ private:
if(entryPtr)
{
std::lock_guard<std::mutex> lock(bucketMutex_);
event_bucket_queue_[bucketIndexToPush].insert(entryPtr);
inertEntry(delay,entryPtr);
}
else
{
@ -217,7 +265,7 @@ private:
{
std::lock_guard<std::mutex> lock(bucketMutex_);
event_bucket_queue_[bucketIndexToPush].insert(entryPtr);
inertEntry(delay,entryPtr);
}
}
}

View File

@ -442,7 +442,7 @@ void HttpAppFrameworkImpl::run()
interval=_sessionTimeout/1000;
limit=_sessionTimeout;
}
_sessionMapPtr=std::unique_ptr<CacheMap<std::string,SessionPtr>>(new CacheMap<std::string,SessionPtr>(&_loop,interval,limit));
_sessionMapPtr=std::unique_ptr<CacheMap<std::string,SessionPtr>>(new CacheMap<std::string,SessionPtr>(&_loop));
}
_loop.loop();
}

View File

@ -1,6 +1,6 @@
#include <drogon/CacheMap.h>
#include <trantor/net/EventLoopThread.h>
#include <drogon/utils/Utilities.h>
#include <unistd.h>
#include <string>
#include <thread>
@ -10,7 +10,13 @@ int main()
{
trantor::EventLoopThread loopThread;
loopThread.run();
drogon::CacheMap<std::string,std::string> cache(loopThread.getLoop(),1,120);
drogon::CacheMap<std::string,std::string> cache(loopThread.getLoop(),1,3,3);
for(int i=0;i<40;i++)
{
cache.insert(drogon::formattedString("aaa%d",i),"hehe",i,[=](){
std::cout<<i<<" cache item erased!"<<std::endl;
});
}
cache.insert("1","first",20,[=]{
std::cout<<"first item in cache timeout,erase!"<<std::endl;
});