diff --git a/README.md b/README.md index 832f4cf..c9c7ea9 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,20 @@ Requests/sec: 50056.01 Transfer/sec: 34.23MB ``` +### oDinZu Drogon HTTP Server GET/JSON REQ with 2 IO threads +``` +wrk -c200 -d5 -t4 http://localhost:8848/3a322920d42ef0763152a6efff2ed51985530aedd45370f92fd0f0b8dcc30220 +Running 5s test @ http://localhost:8848/3a322920d42ef0763152a6efff2ed51985530aedd45370f92fd0f0b8dcc30220 + 4 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 7.54ms 23.66ms 413.86ms 98.15% + Req/Sec 10.37k 0.90k 13.36k 87.00% + 206443 requests in 5.00s, 141.16MB read + Non-2xx or 3xx responses: 206443 +Requests/sec: 41253.06 +Transfer/sec: 28.21MB +``` + ## Development & Contributing ### Developer Dependency Requirements diff --git a/vendors/drogon/local-api-server/main.cc b/vendors/drogon/local-api-server/main.cc index ea48dad..61babcc 100644 --- a/vendors/drogon/local-api-server/main.cc +++ b/vendors/drogon/local-api-server/main.cc @@ -1,71 +1,212 @@ -#include -#include -#include +#include + using namespace drogon; -class WebSocketChat : public drogon::WebSocketController +HttpResponsePtr makeFailedResponse() +{ + Json::Value json; + json["ok"] = false; + auto resp = HttpResponse::newHttpJsonResponse(json); + resp->setStatusCode(k500InternalServerError); + return resp; +} + +HttpResponsePtr makeSuccessResponse() +{ + Json::Value json; + json["ok"] = true; + auto resp = HttpResponse::newHttpJsonResponse(json); + return resp; +} + +std::string getRandomString(size_t n) +{ + std::vector random(n); + utils::secureRandomBytes(random.data(), random.size()); + + // This is cryptographically safe as 256 mod 16 == 0 + static const std::string alphabets = "0123456789abcdef"; + assert(256 % alphabets.size() == 0); + std::string randomString(n, '\0'); + for (size_t i = 0; i < n; i++) + randomString[i] = alphabets[random[i] % alphabets.size()]; + return randomString; +} + +struct DataItem +{ + Json::Value item; + std::mutex mtx; +}; + +class JsonStore : public HttpController { public: - virtual void handleNewMessage(const WebSocketConnectionPtr &, - std::string &&, - const WebSocketMessageType &) override; - virtual void handleConnectionClosed( - const WebSocketConnectionPtr &) override; - virtual void handleNewConnection(const HttpRequestPtr &, - const WebSocketConnectionPtr &) override; - WS_PATH_LIST_BEGIN - WS_PATH_ADD("/chat", Get); - WS_PATH_LIST_END - private: - PubSubService chatRooms_; -}; + METHOD_LIST_BEGIN + ADD_METHOD_TO(JsonStore::getToken, "/get-token", Get); + ADD_METHOD_VIA_REGEX(JsonStore::getItem, "/([a-f0-9]{64})/(.*)", Get); + ADD_METHOD_VIA_REGEX(JsonStore::createItem, "/([a-f0-9]{64})", Post); + ADD_METHOD_VIA_REGEX(JsonStore::deleteItem, "/([a-f0-9]{64})", Delete); + ADD_METHOD_VIA_REGEX(JsonStore::updateItem, "/([a-f0-9]{64})/(.*)", Put); + METHOD_LIST_END -struct Subscriber -{ - std::string chatRoomName_; - drogon::SubscriberID id_; -}; - -void WebSocketChat::handleNewMessage(const WebSocketConnectionPtr &wsConnPtr, - std::string &&message, - const WebSocketMessageType &type) -{ - // write your application logic here - LOG_DEBUG << "new websocket message:" << message; - if (type == WebSocketMessageType::Ping) + void getToken(const HttpRequestPtr &, + std::function &&callback) { - LOG_DEBUG << "recv a ping"; + std::string randomString = getRandomString(64); + Json::Value res; + res["token"] = randomString; + + callback(HttpResponse::newHttpJsonResponse(std::move(res))); } - else if (type == WebSocketMessageType::Text) + + void getItem(const HttpRequestPtr &, + std::function &&callback, + const std::string &token, + const std::string &path) { - auto &s = wsConnPtr->getContextRef(); - chatRooms_.publish(s.chatRoomName_, message); + auto itemPtr = [this, &token]() -> std::shared_ptr { + // It is possible that the item is being removed while another + // thread tries to look it up. The mutex here prevents that from + // happening. + std::lock_guard lock(storageMtx_); + auto it = dataStore_.find(token); + if (it == dataStore_.end()) + return nullptr; + return it->second; + }(); + if (itemPtr == nullptr) + { + callback(makeFailedResponse()); + return; + } + + auto &item = *itemPtr; + // Prevents another thread from writing to the same item while this + // thread reads. Could cause blockage if multiple clients are asking to + // read the same object. But that should be rare. + std::lock_guard lock(item.mtx); + Json::Value *valuePtr = walkJson(item.item, path); + + if (valuePtr == nullptr) + { + callback(makeFailedResponse()); + return; + } + + auto resp = HttpResponse::newHttpJsonResponse(*valuePtr); + callback(resp); } -} -void WebSocketChat::handleConnectionClosed(const WebSocketConnectionPtr &conn) -{ - LOG_DEBUG << "websocket closed!"; - auto &s = conn->getContextRef(); - chatRooms_.unsubscribe(s.chatRoomName_, s.id_); -} + void updateItem(const HttpRequestPtr &req, + std::function &&callback, + const std::string &token, + const std::string &path) + { + auto jsonPtr = req->jsonObject(); + auto itemPtr = [this, &token]() -> std::shared_ptr { + // It is possible that the item is being removed while another + // thread tries to look it up. The mutex here prevents that from + // happening. + std::lock_guard lock(storageMtx_); + auto it = dataStore_.find(token); + if (it == dataStore_.end()) + return nullptr; + return it->second; + }(); -void WebSocketChat::handleNewConnection(const HttpRequestPtr &req, - const WebSocketConnectionPtr &conn) -{ - LOG_DEBUG << "new websocket connection!"; - conn->send("haha!!!"); - Subscriber s; - s.chatRoomName_ = req->getParameter("room_name"); - s.id_ = chatRooms_.subscribe(s.chatRoomName_, - [conn](const std::string &topic, - const std::string &message) { - // Suppress unused variable warning - (void)topic; - conn->send(message); - }); - conn->setContext(std::make_shared(std::move(s))); -} + if (itemPtr == nullptr || jsonPtr == nullptr) + { + callback(makeFailedResponse()); + return; + } + + auto &item = *itemPtr; + std::lock_guard lock(item.mtx); + Json::Value *valuePtr = walkJson(item.item, path, 1); + + if (valuePtr == nullptr) + { + callback(makeFailedResponse()); + return; + } + + std::string key = utils::splitString(path, "/").back(); + (*valuePtr)[key] = *jsonPtr; + + callback(makeSuccessResponse()); + } + + void createItem(const HttpRequestPtr &req, + std::function &&callback, + const std::string &token) + { + auto jsonPtr = req->jsonObject(); + if (jsonPtr == nullptr) + { + callback(makeFailedResponse()); + return; + } + + std::lock_guard lock(storageMtx_); + if (dataStore_.find(token) == dataStore_.end()) + { + auto item = std::make_shared(); + item->item = std::move(*jsonPtr); + dataStore_.insert({token, std::move(item)}); + + callback(makeSuccessResponse()); + } + else + { + callback(makeFailedResponse()); + } + } + + void deleteItem(const HttpRequestPtr &, + std::function &&callback, + const std::string &token) + { + std::lock_guard lock(storageMtx_); + dataStore_.erase(token); + + callback(makeSuccessResponse()); + } + + protected: + static Json::Value *walkJson(Json::Value &json, + const std::string &path, + size_t ignore_back = 0) + { + auto pathElem = utils::splitString(path, "/", false); + if (pathElem.size() >= ignore_back) + pathElem.resize(pathElem.size() - ignore_back); + Json::Value *valuePtr = &json; + for (const auto &elem : pathElem) + { + if (valuePtr->isArray()) + { + Json::Value &value = (*valuePtr)[std::stoi(elem)]; + if (value.isNull()) + return nullptr; + + valuePtr = &value; + } + else + { + Json::Value &value = (*valuePtr)[elem]; + if (value.isNull()) + return nullptr; + + valuePtr = &value; + } + } + return valuePtr; + } + + std::unordered_map> dataStore_; + std::mutex storageMtx_; +}; int main() {