Compare commits

..

No commits in common. "9533a7018f26b27142d150385934030950a506de" and "38967ed0fc188b99e2dcafd991ad38c3698edaad" have entirely different histories.

16 changed files with 271 additions and 494 deletions

View File

@ -245,7 +245,6 @@ set(DROGON_SOURCES
lib/src/HttpAppFrameworkImpl.cc lib/src/HttpAppFrameworkImpl.cc
lib/src/HttpBinder.cc lib/src/HttpBinder.cc
lib/src/Http2Transport.cc lib/src/Http2Transport.cc
lib/src/Http1xTransport.cc
lib/src/HttpClientImpl.cc lib/src/HttpClientImpl.cc
lib/src/HttpControllersRouter.cc lib/src/HttpControllersRouter.cc
lib/src/HttpFileImpl.cc lib/src/HttpFileImpl.cc
@ -292,8 +291,6 @@ set(private_headers
lib/src/FiltersFunction.h lib/src/FiltersFunction.h
lib/src/HttpAppFrameworkImpl.h lib/src/HttpAppFrameworkImpl.h
lib/src/HttpClientImpl.h lib/src/HttpClientImpl.h
lib/src/Http2Transport.h
lib/src/Http1xTransport.h
lib/src/HttpControllersRouter.h lib/src/HttpControllersRouter.h
lib/src/HttpFileImpl.h lib/src/HttpFileImpl.h
lib/src/HttpFileUploadRequest.h lib/src/HttpFileUploadRequest.h

View File

@ -16,7 +16,10 @@ int main()
{ {
trantor::Logger::setLogLevel(trantor::Logger::kTrace); trantor::Logger::setLogLevel(trantor::Logger::kTrace);
{ {
auto client = HttpClient::newHttpClient("http://www.baidu.com"); auto client = HttpClient::newHttpClient("https://clehaxze.tw:8844",
nullptr,
false,
false);
client->setSockOptCallback([](int fd) { client->setSockOptCallback([](int fd) {
std::cout << "setSockOptCallback:" << fd << std::endl; std::cout << "setSockOptCallback:" << fd << std::endl;
#ifdef __linux__ #ifdef __linux__
@ -45,7 +48,7 @@ int main()
req->setParameter("wd", "wx"); req->setParameter("wd", "wx");
req->setParameter("oq", "wx"); req->setParameter("oq", "wx");
for (int i = 0; i < 10; ++i) for (int i = 0; i < 1; ++i)
{ {
client->sendRequest( client->sendRequest(
req, [](ReqResult result, const HttpResponsePtr &response) { req, [](ReqResult result, const HttpResponsePtr &response) {
@ -71,8 +74,8 @@ int main()
} }
std::cout << "count=" << nth_resp << std::endl; std::cout << "count=" << nth_resp << std::endl;
}); });
LOG_INFO << "send request";
} }
} }
app().run(); app().run();
} }

View File

@ -184,11 +184,6 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* If this method is not called, the default depth value is 0 which means * If this method is not called, the default depth value is 0 which means
* the pipelining is disabled. For details about pipelining, see * the pipelining is disabled. For details about pipelining, see
* rfc2616-8.1.2.2 * rfc2616-8.1.2.2
*
* @param depth The depth value.
* @note This option is only valid for HTTP/1.x. If the client running in
* HTTP/2 mode, this settings have no effect. The maximum concurrent
* requests for HTTP/2 is 100 (unless the server has a different setting).
*/ */
virtual void setPipeliningDepth(size_t depth) = 0; virtual void setPipeliningDepth(size_t depth) = 0;
@ -243,20 +238,15 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* eanbled for HTTPS. * eanbled for HTTPS.
* @param validateCert If the parameter is set to true, the client validates * @param validateCert If the parameter is set to true, the client validates
* the server certificate when SSL handshaking. * the server certificate when SSL handshaking.
* @param targetVersion The target TLS version to use for HTTPS connections.
* This is a mere hint, and the actual version used will depend on the
* server and the client's capabilities.
* @return HttpClientPtr The smart pointer to the new client object. * @return HttpClientPtr The smart pointer to the new client object.
* @note: The ip parameter support for both ipv4 and ipv6 address * @note: The ip parameter support for both ipv4 and ipv6 address
*/ */
static HttpClientPtr newHttpClient( static HttpClientPtr newHttpClient(const std::string &ip,
const std::string &ip,
uint16_t port, uint16_t port,
bool useSSL = false, bool useSSL = false,
trantor::EventLoop *loop = nullptr, trantor::EventLoop *loop = nullptr,
bool useOldTLS = false, bool useOldTLS = false,
bool validateCert = true, bool validateCert = true);
std::optional<Version> targetVersion = std::nullopt);
/// Get the event loop of the client; /// Get the event loop of the client;
virtual trantor::EventLoop *getLoop() = 0; virtual trantor::EventLoop *getLoop() = 0;
@ -313,16 +303,6 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
const std::vector<std::pair<std::string, std::string>> const std::vector<std::pair<std::string, std::string>>
&sslConfCmds) = 0; &sslConfCmds) = 0;
/**
* @brief get the protocol version used by the HTTP connection
* @return std::optional<Version> the protocol version used by the HTTP
*
* NOTE: It could return std::nullopt if the connection is not established
* or is still negotiating the protocol. This is IMPORTANT as the client
* COULD be lazy and not connecting until the first request arrives.
*/
virtual std::optional<Version> protocolVersion() const = 0;
/// Create a Http client using the hostString to connect to server /// Create a Http client using the hostString to connect to server
/** /**
* *
@ -349,25 +329,15 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* @param validateCert If the parameter is set to true, the client validates * @param validateCert If the parameter is set to true, the client validates
* the server certificate when SSL handshaking. * the server certificate when SSL handshaking.
* *
* @param targetVersion The target HTTP version that the client will attempt
* to use. **THIS IS ONLY A HINT**. The server may choose to ignore this as
* the server may not support the requested version. The preference is as
* follows:
* HTTP/2 > HTTP/1.1 > HTTP/1.0
* Note that there is no way to auto-detect HTTP/1.0 servers, so it is only
* used if explicitly requested.
*
* @note Don't add path and parameters in hostString, the request path and * @note Don't add path and parameters in hostString, the request path and
* parameters should be set in HttpRequestPtr when calling the sendRequest() * parameters should be set in HttpRequestPtr when calling the sendRequest()
* method. * method.
* *
*/ */
static HttpClientPtr newHttpClient( static HttpClientPtr newHttpClient(const std::string &hostString,
const std::string &hostString,
trantor::EventLoop *loop = nullptr, trantor::EventLoop *loop = nullptr,
bool useOldTLS = false, bool useOldTLS = false,
bool validateCert = true, bool validateCert = true);
std::optional<Version> targetVersion = std::nullopt);
virtual ~HttpClient() virtual ~HttpClient()
{ {

View File

@ -1,104 +0,0 @@
#include "Http1xTransport.h"
#include "HttpResponseParser.h"
using namespace drogon;
Http1xTransport::Http1xTransport(trantor::TcpConnectionPtr connPtr,
Version version,
size_t *bytesSent,
size_t *bytesReceived)
: connPtr(connPtr),
bytesSent_(bytesSent),
bytesReceived_(bytesReceived),
version_(version)
{
connPtr->setContext(std::make_shared<HttpResponseParser>(connPtr));
}
void Http1xTransport::sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback)
{
sendReq(req);
pipeliningCallbacks_.emplace(std::move(req), std::move(callback));
}
void Http1xTransport::onRecvMessage(const trantor::TcpConnectionPtr &conn,
trantor::MsgBuffer *msg)
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
assert(responseParser != nullptr);
assert(connPtr.get() == conn.get());
// LOG_TRACE << "###:" << msg->readableBytes();
auto msgSize = msg->readableBytes();
while (msg->readableBytes() > 0)
{
if (pipeliningCallbacks_.empty())
{
LOG_ERROR << "More responses than expected!";
connPtr->shutdown();
return;
}
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
if (!responseParser->parseResponse(msg))
{
*bytesReceived_ += (msgSize - msg->readableBytes());
errorCallback(ReqResult::BadResponse);
return;
}
if (responseParser->gotAll())
{
auto resp = responseParser->responseImpl();
responseParser->reset();
*bytesReceived_ += (msgSize - msg->readableBytes());
msgSize = msg->readableBytes();
respCallback(resp, std::move(firstReq), conn);
pipeliningCallbacks_.pop();
}
else
{
break;
}
}
}
Http1xTransport::~Http1xTransport()
{
}
bool Http1xTransport::handleConnectionClose()
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
if (responseParser && responseParser->parseResponseOnClose() &&
responseParser->gotAll())
{
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
auto resp = responseParser->responseImpl();
responseParser->reset();
respCallback(resp, std::move(firstReq), connPtr);
return false;
}
return true;
}
void Http1xTransport::sendReq(const HttpRequestPtr &req)
{
trantor::MsgBuffer buffer;
assert(req);
auto implPtr = static_cast<HttpRequestImpl *>(req.get());
assert(version_ == Version::kHttp10 || version_ == Version::kHttp11);
implPtr->appendToBuffer(&buffer, version_);
LOG_TRACE << "Send request:"
<< std::string_view(buffer.peek(), buffer.readableBytes());
*bytesSent_ += buffer.readableBytes();
connPtr->send(std::move(buffer));
}

View File

@ -1,52 +0,0 @@
#pragma once
#include <trantor/net/EventLoop.h>
#include <trantor/net/TcpClient.h>
#include <list>
#include <queue>
#include <vector>
#include "HttpTransport.h"
namespace drogon
{
class Http1xTransport : public HttpTransport
{
private:
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> pipeliningCallbacks_;
trantor::TcpConnectionPtr connPtr;
size_t *bytesSent_;
size_t *bytesReceived_;
Version version_{Version::kHttp11};
void sendReq(const HttpRequestPtr &req);
public:
Http1xTransport(trantor::TcpConnectionPtr connPtr,
Version version,
size_t *bytesSent,
size_t *bytesReceived);
virtual ~Http1xTransport();
void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback) override;
void onRecvMessage(const trantor::TcpConnectionPtr &,
trantor::MsgBuffer *) override;
size_t requestsInFlight() const override
{
return pipeliningCallbacks_.size();
}
bool handleConnectionClose() override;
void onError(ReqResult result) override
{
while (!pipeliningCallbacks_.empty())
{
auto &cb = pipeliningCallbacks_.front().second;
cb(result, nullptr);
pipeliningCallbacks_.pop();
}
}
};
} // namespace drogon

View File

@ -1,7 +1,5 @@
#include "Http2Transport.h" #include "Http2Transport.h"
#include "HttpFileUploadRequest.h"
#include <fstream>
#include <variant> #include <variant>
using namespace drogon; using namespace drogon;
@ -144,7 +142,7 @@ std::optional<WindowUpdateFrame> WindowUpdateFrame::parse(ByteStream &payload,
} }
WindowUpdateFrame frame; WindowUpdateFrame frame;
// MSB is reserved for future use // MSB is reserved for future use
auto [_, windowSizeIncrement] = payload.readBI31BE(); auto [_, windowSizeIncrement] = payload.readBI32BE();
frame.windowSizeIncrement = windowSizeIncrement; frame.windowSizeIncrement = windowSizeIncrement;
return frame; return frame;
} }
@ -190,7 +188,7 @@ std::optional<HeadersFrame> HeadersFrame::parse(ByteStream &payload,
if (priority) if (priority)
{ {
auto [exclusive, streamDependency] = payload.readBI31BE(); auto [exclusive, streamDependency] = payload.readBI32BE();
frame.exclusive = exclusive; frame.exclusive = exclusive;
frame.streamDependency = streamDependency; frame.streamDependency = streamDependency;
frame.weight = payload.readU8(); frame.weight = payload.readU8();
@ -277,7 +275,7 @@ std::optional<DataFrame> DataFrame::parse(ByteStream &payload, uint8_t flags)
} }
size_t minSize = frame.padLength; size_t minSize = frame.padLength;
if (payload.remaining() < minSize) if (payload.size() < minSize)
{ {
LOG_TRACE << "Invalid data frame length"; LOG_TRACE << "Invalid data frame length";
return std::nullopt; return std::nullopt;
@ -387,7 +385,7 @@ std::optional<PushPromiseFrame> PushPromiseFrame::parse(ByteStream &payload,
return std::nullopt; return std::nullopt;
} }
auto [_, promisedStreamId] = payload.readBI31BE(); auto [_, promisedStreamId] = payload.readBI32BE();
frame.promisedStreamId = promisedStreamId; frame.promisedStreamId = promisedStreamId;
assert(payload.remaining() >= frame.padLength); assert(payload.remaining() >= frame.padLength);
frame.headerBlockFragment.resize(payload.remaining() - frame.padLength); frame.headerBlockFragment.resize(payload.remaining() - frame.padLength);
@ -561,11 +559,12 @@ static void serializeFrame(OByteStream &buffer,
buffer.overwriteU8(baseOffset + 4, flags); buffer.overwriteU8(baseOffset + 4, flags);
} }
// return data {frame, streamId, h2flags, fatal-error} // return streamId, frame, error and should continue parsing
// Note that error can orrcur on a stream level or the entire connection // Note that error can orrcur on a stream level or the entire connection
// We need to handle both cases. Also it could happen that the TCP stream // We need to handle both cases. Also it could happen that the TCP stream
// just cuts off in the middle of a frame (or header). We need to handle that // just cuts off in the middle of a frame (or header). We need to handle that
// too. // too.
// return data {frame, streamId, h2flags, fatal-error}
static std::tuple<std::optional<H2Frame>, uint32_t, uint8_t, bool> parseH2Frame( static std::tuple<std::optional<H2Frame>, uint32_t, uint8_t, bool> parseH2Frame(
trantor::MsgBuffer *msg) trantor::MsgBuffer *msg)
{ {
@ -653,15 +652,7 @@ void Http2Transport::sendRequestInLoop(const HttpRequestPtr &req,
return; return;
} }
const auto sid = nextStreamId(); const int32_t streamId = nextStreamId();
if (!sid.has_value())
{
// Upper HTTP client should see this and retry
// TODO: Need more elegant handling
connPtr->forceClose();
return;
}
const auto streamId = *sid;
assert(streamId % 2 == 1); assert(streamId % 2 == 1);
LOG_TRACE << "Sending HTTP/2 request: streamId=" << streamId; LOG_TRACE << "Sending HTTP/2 request: streamId=" << streamId;
if (streams.find(streamId) != streams.end()) if (streams.find(streamId) != streams.end())
@ -710,10 +701,7 @@ void Http2Transport::sendRequestInLoop(const HttpRequestPtr &req,
encoded.resize(n); encoded.resize(n);
LOG_TRACE << "Encoded headers size: " << encoded.size(); LOG_TRACE << "Encoded headers size: " << encoded.size();
bool haveBody = bool haveBody = req->body().length() > 0;
req->body().length() > 0 ||
(req->contentType() != CT_MULTIPART_FORM_DATA &&
dynamic_cast<HttpFileUploadRequest *>(req.get()) != nullptr);
auto &stream = createStream(streamId); auto &stream = createStream(streamId);
stream.callback = std::move(callback); stream.callback = std::move(callback);
stream.request = req; stream.request = req;
@ -746,8 +734,9 @@ void Http2Transport::sendRequestInLoop(const HttpRequestPtr &req,
if (!haveBody) if (!haveBody)
return; return;
auto [sentOffset, done] = sendBodyForStream(stream, 0); size_t sentOffset = sendBodyForStream(stream, 0);
if (!done) assert(sentOffset <= req->body().length());
if (sentOffset != req->body().length())
{ {
auto it = pendingDataSend.find(streamId); auto it = pendingDataSend.find(streamId);
if (it != pendingDataSend.end()) if (it != pendingDataSend.end())
@ -879,8 +868,8 @@ void Http2Transport::onRecvMessage(const trantor::TcpConnectionPtr &,
do do
{ {
auto &stream = streams[it->first]; auto &stream = streams[it->first];
auto [sentOffset, done] = sendBodyForStream(stream, it->second); auto sentOffset = sendBodyForStream(stream, it->second);
if (done) if (sentOffset == stream.request->body().length())
{ {
pendingDataSend.erase(it); pendingDataSend.erase(it);
it = pendingDataSend.begin(); it = pendingDataSend.begin();
@ -987,18 +976,6 @@ void Http2Transport::onRecvMessage(const trantor::TcpConnectionPtr &,
// Do nothing. RFC says to ignore unknown frames // Do nothing. RFC says to ignore unknown frames
} }
} }
if (!runningOutStreamId() || reconnectionIssued)
return;
// Only attempt to reconnect if there is no request in progress
if (requestsInFlight() == 0 && bufferedRequests.empty())
{
reconnectionIssued = true;
LOG_TRACE << "Reconnect due to running out of stream ids.";
connPtr->getLoop()->queueInLoop(
[connPtr = this->connPtr]() { connPtr->shutdown(); });
}
} }
bool Http2Transport::parseAndApplyHeaders(internal::H2Stream &stream, bool Http2Transport::parseAndApplyHeaders(internal::H2Stream &stream,
@ -1235,8 +1212,9 @@ void Http2Transport::handleFrameForStream(const internal::H2Frame &frame,
if (it == pendingDataSend.end()) if (it == pendingDataSend.end())
return; return;
auto [sentOffset, done] = sendBodyForStream(stream, it->second); auto sentOffset = sendBodyForStream(stream, it->second);
if (done) assert(sentOffset <= stream.request->body().length());
if (sentOffset == stream.request->body().length())
pendingDataSend.erase(it); pendingDataSend.erase(it);
else else
it->second = sentOffset; it->second = sentOffset;
@ -1276,9 +1254,6 @@ void Http2Transport::responseSuccess(internal::H2Stream &stream)
auto it = streams.find(stream.streamId); auto it = streams.find(stream.streamId);
assert(it != streams.end()); assert(it != streams.end());
auto streamId = stream.streamId; auto streamId = stream.streamId;
// It is technically possible for a server to response before we fully
// send the body. So cehck and remove.
pendingDataSend.erase(streamId);
// XXX: This callback seems to be able to cause the destruction of this // XXX: This callback seems to be able to cause the destruction of this
// object // object
respCallback(stream.response, respCallback(stream.response,
@ -1286,9 +1261,12 @@ void Http2Transport::responseSuccess(internal::H2Stream &stream)
connPtr); connPtr);
streams.erase(it); // NOTE: stream is now invalid streams.erase(it); // NOTE: stream is now invalid
// It is technically possible for a server to response before we fully
// send the body. So cehck and remove.
pendingDataSend.erase(streamId);
if (bufferedRequests.empty()) if (bufferedRequests.empty())
return; return;
assert(streams.size() < maxConcurrentStreams);
auto &[req, cb] = bufferedRequests.front(); auto &[req, cb] = bufferedRequests.front();
sendRequestInLoop(req, std::move(cb)); sendRequestInLoop(req, std::move(cb));
bufferedRequests.pop(); bufferedRequests.pop();
@ -1351,27 +1329,29 @@ bool Http2Transport::handleConnectionClose()
return true; return true;
} }
std::pair<size_t, bool> Http2Transport::sendBodyForStream( size_t Http2Transport::sendBodyForStream(internal::H2Stream &stream,
internal::H2Stream &stream, size_t offset)
const void *data,
size_t size)
{ {
auto streamId = stream.streamId; auto streamId = stream.streamId;
if (stream.avaliableTxWindow == 0 || avaliableTxWindow == 0) if (stream.avaliableTxWindow == 0 || avaliableTxWindow == 0)
return {0, false}; return offset;
size_t maxSendSize = size; assert(stream.request != nullptr);
assert(stream.request->body().length() >= offset);
size_t maxSendSize = stream.request->body().length() - offset;
maxSendSize = (std::min)(maxSendSize, stream.avaliableTxWindow); maxSendSize = (std::min)(maxSendSize, stream.avaliableTxWindow);
maxSendSize = (std::min)(maxSendSize, avaliableTxWindow); maxSendSize = (std::min)(maxSendSize, avaliableTxWindow);
bool sendEverything = maxSendSize == size; size_t sendEndPos = offset + maxSendSize;
bool sendEverything =
maxSendSize == stream.request->body().length() - offset;
size_t i = 0; size_t i = 0;
for (i = 0; i < size; i += maxFrameSize) for (i = offset; i < sendEndPos; i += maxFrameSize)
{ {
size_t remaining = size - i; size_t remaining = stream.request->body().length() - i;
size_t readSize = (std::min)(maxFrameSize, remaining); size_t readSize = (std::min)(maxFrameSize, remaining);
bool endStream = sendEverything && i + maxFrameSize >= size; bool endStream = sendEverything && i + maxFrameSize >= sendEndPos;
const std::string_view sendData((const char *)data + i, readSize); std::string_view sendData(stream.request->body().data() + i, readSize);
DataFrame dataFrame(sendData, endStream); DataFrame dataFrame(sendData, endStream);
LOG_TRACE << "Sending data frame: size=" << readSize LOG_TRACE << "Sending data frame: size=" << readSize
<< " endStream=" << dataFrame.endStream; << " endStream=" << dataFrame.endStream;
@ -1380,45 +1360,8 @@ std::pair<size_t, bool> Http2Transport::sendBodyForStream(
stream.avaliableTxWindow -= readSize; stream.avaliableTxWindow -= readSize;
avaliableTxWindow -= readSize; avaliableTxWindow -= readSize;
} }
i = (std::min)(i, size); assert(i >= offset && i <= sendEndPos);
return {i, sendEverything}; return i;
}
std::pair<size_t, bool> Http2Transport::sendBodyForStream(
internal::H2Stream &stream,
size_t offset)
{
auto streamId = stream.streamId;
if (stream.avaliableTxWindow == 0 || avaliableTxWindow == 0)
return {offset, false};
// Special handling for multipart because different underlying code
auto mPtr = dynamic_cast<HttpFileUploadRequest *>(stream.request.get());
if (mPtr)
{
// TODO: Don't put everything in memory. This causes a lot of memory
// when uploading large files. Howerver, we are not doing better in 1.x
// client either. So, meh.
if (stream.multipartData.readableBytes() == 0)
mPtr->renderMultipartFormData(stream.multipartData);
auto &content = stream.multipartData;
// CANNOT be empty. At least we are sending the boundary
assert(content.readableBytes() > 0);
auto [amount, done] =
sendBodyForStream(stream, content.peek(), content.readableBytes());
if (done)
// force release memory
stream.multipartData = trantor::MsgBuffer();
return {offset + amount, done};
}
auto [amount, done] =
sendBodyForStream(stream,
stream.request->body().data() + offset,
stream.request->body().length() - offset);
assert(offset + amount <= stream.request->body().length());
return {offset + amount, done};
} }
void Http2Transport::sendFrame(const internal::H2Frame &frame, int32_t streamId) void Http2Transport::sendFrame(const internal::H2Frame &frame, int32_t streamId)

View File

@ -6,7 +6,6 @@
#include "hpack/HPacker.h" #include "hpack/HPacker.h"
#include <variant> #include <variant>
#include <climits>
namespace drogon namespace drogon
{ {
@ -46,7 +45,7 @@ struct ByteStream
return res; return res;
} }
std::pair<bool, int32_t> readBI31BE() std::pair<bool, int32_t> readBI32BE()
{ {
assert(length >= 4 && offset <= length - 4); assert(length >= 4 && offset <= length - 4);
int32_t res = ptr[offset] << 24 | ptr[offset + 1] << 16 | int32_t res = ptr[offset] << 24 | ptr[offset + 1] << 16 |
@ -427,8 +426,6 @@ struct H2Stream
size_t avaliableTxWindow = 65535; size_t avaliableTxWindow = 65535;
size_t avaliableRxWindow = 65535; size_t avaliableRxWindow = 65535;
StreamState state = StreamState::ExpectingHeaders; StreamState state = StreamState::ExpectingHeaders;
trantor::MsgBuffer multipartData;
}; };
} // namespace internal } // namespace internal
@ -468,7 +465,7 @@ class Http2Transport : public HttpTransport
int32_t expectngContinuationStreamId = 0; int32_t expectngContinuationStreamId = 0;
std::unordered_map<int32_t, size_t> pendingDataSend; std::unordered_map<int32_t, size_t> pendingDataSend;
bool reconnectionIssued = false; // TODO: Handle server-initiated stream creation
// HTTP/2 client-wide settings (can be changed by server) // HTTP/2 client-wide settings (can be changed by server)
size_t maxConcurrentStreams = 100; size_t maxConcurrentStreams = 100;
@ -478,9 +475,8 @@ class Http2Transport : public HttpTransport
// Configuration settings // Configuration settings
const uint32_t windowIncreaseThreshold = 16384; const uint32_t windowIncreaseThreshold = 16384;
const uint32_t windowIncreaseSize = 128 * 1024; // 128KB const uint32_t windowIncreaseSize = 10 * 1024 * 1024; // 10 MiB
const uint32_t maxCompressiedHeaderSize = 2048; const uint32_t maxCompressiedHeaderSize = 2048;
const int32_t streamIdReconnectThreshold = INT_MAX - 8192;
// HTTP/2 connection-wide state // HTTP/2 connection-wide state
size_t avaliableTxWindow = 65535; size_t avaliableTxWindow = 65535;
@ -490,25 +486,15 @@ class Http2Transport : public HttpTransport
void responseSuccess(internal::H2Stream &stream); void responseSuccess(internal::H2Stream &stream);
void responseErrored(int32_t streamId, ReqResult result); void responseErrored(int32_t streamId, ReqResult result);
std::optional<int32_t> nextStreamId() int32_t nextStreamId()
{ {
// XXX: Technically UB. But no one actually uses 1's complement // TODO: Handling stream ID requires to reconnect
if (currentStreamId < 0) // the entire connection. Handle this somehow
return std::nullopt;
int32_t streamId = currentStreamId; int32_t streamId = currentStreamId;
currentStreamId += 2; currentStreamId += 2;
return streamId; return streamId;
} }
// Returns true when we SHOULD reconnect due to exhausting stream IDs.
// Doesn't mean we will. We will force a reconnect when we actually
// run out.
inline bool runningOutStreamId()
{
return currentStreamId > streamIdReconnectThreshold;
}
void handleFrameForStream(const internal::H2Frame &frame, void handleFrameForStream(const internal::H2Frame &frame,
int32_t streamId, int32_t streamId,
uint8_t flags); uint8_t flags);
@ -519,11 +505,7 @@ class Http2Transport : public HttpTransport
bool parseAndApplyHeaders(internal::H2Stream &stream, bool parseAndApplyHeaders(internal::H2Stream &stream,
const void *data, const void *data,
size_t len); size_t len);
std::pair<size_t, bool> sendBodyForStream(internal::H2Stream &stream, size_t sendBodyForStream(internal::H2Stream &stream, size_t offset);
const void *data,
size_t size);
std::pair<size_t, bool> sendBodyForStream(internal::H2Stream &stream,
size_t offset);
void sendFrame(const internal::H2Frame &frame, int32_t streamId); void sendFrame(const internal::H2Frame &frame, int32_t streamId);
void sendBufferedData(); void sendBufferedData();
@ -540,7 +522,7 @@ class Http2Transport : public HttpTransport
size_t requestsInFlight() const override size_t requestsInFlight() const override
{ {
return streams.size(); return 0;
} }
bool handleConnectionClose() override; bool handleConnectionClose() override;

View File

@ -31,12 +31,94 @@ namespace trantor
const static size_t kDefaultDNSTimeout{600}; const static size_t kDefaultDNSTimeout{600};
} }
Http1xTransport::Http1xTransport(trantor::TcpConnectionPtr connPtr,
size_t *bytesSent,
size_t *bytesReceived)
: connPtr(connPtr), bytesSent_(bytesSent), bytesReceived_(bytesReceived)
{
connPtr->setContext(std::make_shared<HttpResponseParser>(connPtr));
}
void Http1xTransport::sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback)
{
sendReq(req);
pipeliningCallbacks_.emplace(std::move(req), std::move(callback));
}
void Http1xTransport::onRecvMessage(const trantor::TcpConnectionPtr &conn,
trantor::MsgBuffer *msg)
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
assert(responseParser != nullptr);
assert(connPtr.get() == conn.get());
// LOG_TRACE << "###:" << msg->readableBytes();
auto msgSize = msg->readableBytes();
while (msg->readableBytes() > 0)
{
if (pipeliningCallbacks_.empty())
{
LOG_ERROR << "More responses than expected!";
connPtr->shutdown();
return;
}
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
if (!responseParser->parseResponse(msg))
{
*bytesReceived_ += (msgSize - msg->readableBytes());
errorCallback(ReqResult::BadResponse);
return;
}
if (responseParser->gotAll())
{
auto resp = responseParser->responseImpl();
responseParser->reset();
*bytesReceived_ += (msgSize - msg->readableBytes());
msgSize = msg->readableBytes();
respCallback(resp, std::move(firstReq), conn);
pipeliningCallbacks_.pop();
}
else
{
break;
}
}
}
Http1xTransport::~Http1xTransport()
{
}
bool Http1xTransport::handleConnectionClose()
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
if (responseParser && responseParser->parseResponseOnClose() &&
responseParser->gotAll())
{
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
auto resp = responseParser->responseImpl();
responseParser->reset();
respCallback(resp, std::move(firstReq), connPtr);
return false;
}
return true;
}
void HttpClientImpl::createTcpClient() void HttpClientImpl::createTcpClient()
{ {
LOG_TRACE << "New TcpClient," << serverAddr_.toIpPort(); LOG_TRACE << "New TcpClient," << serverAddr_.toIpPort();
tcpClientPtr_ = tcpClientPtr_ =
std::make_shared<trantor::TcpClient>(loop_, serverAddr_, "httpClient"); std::make_shared<trantor::TcpClient>(loop_, serverAddr_, "httpClient");
Version version = targetHttpVersion_.value_or(Version::kHttp2);
if (useSSL_ && utils::supportsTls()) if (useSSL_ && utils::supportsTls())
{ {
@ -48,9 +130,8 @@ void HttpClientImpl::createTcpClient()
.setHostname(domain_) .setHostname(domain_)
.setConfCmds(sslConfCmds_) .setConfCmds(sslConfCmds_)
.setCertPath(clientCertPath_) .setCertPath(clientCertPath_)
.setKeyPath(clientKeyPath_); .setKeyPath(clientKeyPath_)
if (version == Version::kHttp2) .setAlpnProtocols({"h2", "http/1.1"});
policy->setAlpnProtocols({"h2", "http/1.1"});
tcpClientPtr_->enableSSL(std::move(policy)); tcpClientPtr_->enableSSL(std::move(policy));
} }
@ -75,15 +156,13 @@ void HttpClientImpl::createTcpClient()
LOG_TRACE << "Connection established!"; LOG_TRACE << "Connection established!";
auto protocol = connPtr->applicationProtocol(); auto protocol = connPtr->applicationProtocol();
if (protocol == "http/1.1") if (protocol.empty() || protocol == "http/1.1")
{ {
LOG_TRACE << "Select http/1.1 protocol"; LOG_TRACE << "Select http/1.1 protocol";
thisPtr->transport_ = thisPtr->transport_ =
std::make_unique<Http1xTransport>(connPtr, std::make_unique<Http1xTransport>(connPtr,
Version::kHttp11,
&thisPtr->bytesSent_, &thisPtr->bytesSent_,
&thisPtr->bytesReceived_); &thisPtr->bytesReceived_);
thisPtr->httpVersion_ = Version::kHttp11;
} }
else if (protocol == "h2") else if (protocol == "h2")
{ {
@ -92,21 +171,6 @@ void HttpClientImpl::createTcpClient()
std::make_unique<Http2Transport>(connPtr, std::make_unique<Http2Transport>(connPtr,
&thisPtr->bytesSent_, &thisPtr->bytesSent_,
&thisPtr->bytesReceived_); &thisPtr->bytesReceived_);
thisPtr->httpVersion_ = Version::kHttp2;
}
else if (protocol.empty())
{
// Either we are not using TLS or the server does not support
// ALPN. Use HTTP/1.1 if not specified otherwise.
bool force1_0 = thisPtr->targetHttpVersion_.value_or(
Version::kUnknown) == Version::kHttp10;
auto version = force1_0 ? Version::kHttp10 : Version::kHttp11;
thisPtr->httpVersion_ = version;
thisPtr->transport_ =
std::make_unique<Http1xTransport>(connPtr,
version,
&thisPtr->bytesSent_,
&thisPtr->bytesReceived_);
} }
else else
{ {
@ -115,9 +179,6 @@ void HttpClientImpl::createTcpClient()
thisPtr->onError(ReqResult::BadResponse); thisPtr->onError(ReqResult::BadResponse);
return; return;
} }
assert(thisPtr->httpVersion_.has_value());
assert(thisPtr->transport_);
thisPtr->transport_->setRespCallback( thisPtr->transport_->setRespCallback(
[weakPtr](const HttpResponseImplPtr &resp, [weakPtr](const HttpResponseImplPtr &resp,
std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb, std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb,
@ -134,13 +195,8 @@ void HttpClientImpl::createTcpClient()
thisPtr->onError(result); thisPtr->onError(result);
}); });
size_t maxSendReq = (*(thisPtr->httpVersion_) == Version::kHttp2) // TODO: respect timeout and pipeliningDepth_
? size_t{0xfffffff} while (!thisPtr->requestsBuffer_.empty())
: thisPtr->pipeliningDepth_;
if (maxSendReq == 0)
maxSendReq = 1;
while (!thisPtr->requestsBuffer_.empty() &&
thisPtr->transport_->requestsInFlight() < maxSendReq)
{ {
auto &reqAndCb = thisPtr->requestsBuffer_.front(); auto &reqAndCb = thisPtr->requestsBuffer_.front();
thisPtr->transport_->sendRequestInLoop(reqAndCb.first, thisPtr->transport_->sendRequestInLoop(reqAndCb.first,
@ -166,8 +222,6 @@ void HttpClientImpl::createTcpClient()
// temporary fix of dead tcpClientPtr_ // temporary fix of dead tcpClientPtr_
// TODO: fix HttpResponseParser when content-length absence // TODO: fix HttpResponseParser when content-length absence
// TODO: HTTP/2 transport also relies on this behavior to reconnect
// when running out of stream IDs
thisPtr->tcpClientPtr_.reset(); thisPtr->tcpClientPtr_.reset();
if (!thisPtr->requestsBuffer_.empty()) if (!thisPtr->requestsBuffer_.empty())
{ {
@ -214,26 +268,20 @@ HttpClientImpl::HttpClientImpl(trantor::EventLoop *loop,
const trantor::InetAddress &addr, const trantor::InetAddress &addr,
bool useSSL, bool useSSL,
bool useOldTLS, bool useOldTLS,
bool validateCert, bool validateCert)
std::optional<Version> targetVersion)
: loop_(loop), : loop_(loop),
serverAddr_(addr), serverAddr_(addr),
useSSL_(useSSL), useSSL_(useSSL),
validateCert_(validateCert), validateCert_(validateCert),
useOldTLS_(useOldTLS), useOldTLS_(useOldTLS)
targetHttpVersion_(targetVersion)
{ {
} }
HttpClientImpl::HttpClientImpl(trantor::EventLoop *loop, HttpClientImpl::HttpClientImpl(trantor::EventLoop *loop,
const std::string &hostString, const std::string &hostString,
bool useOldTLS, bool useOldTLS,
bool validateCert, bool validateCert)
std::optional<Version> targetVersion) : loop_(loop), validateCert_(validateCert), useOldTLS_(useOldTLS)
: loop_(loop),
validateCert_(validateCert),
useOldTLS_(useOldTLS),
targetHttpVersion_(targetVersion)
{ {
auto lowerHost = hostString; auto lowerHost = hostString;
std::transform(lowerHost.begin(), std::transform(lowerHost.begin(),
@ -577,15 +625,10 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
return; return;
} }
assert(transport_ != nullptr); assert(transport_ != nullptr);
assert(httpVersion_.has_value());
size_t maxSendReq = (*httpVersion_ == Version::kHttp2) ? size_t{0xfffffffff}
: pipeliningDepth_;
if (maxSendReq == 0)
maxSendReq = 1;
// Connected, send request now // Connected, send request now
if (transport_->requestsInFlight() <= maxSendReq && requestsBuffer_.empty()) if (transport_->requestsInFlight() <= pipeliningDepth_ &&
requestsBuffer_.empty())
{ {
transport_->sendRequestInLoop(req, std::move(callback)); transport_->sendRequestInLoop(req, std::move(callback));
} }
@ -601,6 +644,18 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
} }
} }
void Http1xTransport::sendReq(const HttpRequestPtr &req)
{
trantor::MsgBuffer buffer;
assert(req);
auto implPtr = static_cast<HttpRequestImpl *>(req.get());
implPtr->appendToBuffer(&buffer);
LOG_TRACE << "Send request:"
<< std::string(buffer.peek(), buffer.readableBytes());
*bytesSent_ += buffer.readableBytes();
connPtr->send(std::move(buffer));
}
void HttpClientImpl::handleResponse( void HttpClientImpl::handleResponse(
const HttpResponseImplPtr &resp, const HttpResponseImplPtr &resp,
std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb, std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb,
@ -667,8 +722,7 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &ip,
bool useSSL, bool useSSL,
trantor::EventLoop *loop, trantor::EventLoop *loop,
bool useOldTLS, bool useOldTLS,
bool validateCert, bool validateCert)
std::optional<Version> targetVersion)
{ {
bool isIpv6 = ip.find(':') == std::string::npos ? false : true; bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
return std::make_shared<HttpClientImpl>( return std::make_shared<HttpClientImpl>(
@ -676,22 +730,19 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &ip,
trantor::InetAddress(ip, port, isIpv6), trantor::InetAddress(ip, port, isIpv6),
useSSL, useSSL,
useOldTLS, useOldTLS,
validateCert, validateCert);
targetVersion);
} }
HttpClientPtr HttpClient::newHttpClient(const std::string &hostString, HttpClientPtr HttpClient::newHttpClient(const std::string &hostString,
trantor::EventLoop *loop, trantor::EventLoop *loop,
bool useOldTLS, bool useOldTLS,
bool validateCert, bool validateCert)
std::optional<Version> targetVersion)
{ {
return std::make_shared<HttpClientImpl>( return std::make_shared<HttpClientImpl>(
loop == nullptr ? HttpAppFrameworkImpl::instance().getLoop() : loop, loop == nullptr ? HttpAppFrameworkImpl::instance().getLoop() : loop,
hostString, hostString,
useOldTLS, useOldTLS,
validateCert, validateCert);
targetVersion);
} }
void HttpClientImpl::onError(ReqResult result) void HttpClientImpl::onError(ReqResult result)

View File

@ -26,11 +26,48 @@
#include "impl_forwards.h" #include "impl_forwards.h"
#include "Http2Transport.h" #include "Http2Transport.h"
#include "HttpTransport.h" #include "HttpTransport.h"
#include "Http1xTransport.h"
namespace drogon namespace drogon
{ {
class Http1xTransport : public HttpTransport
{
private:
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> pipeliningCallbacks_;
trantor::TcpConnectionPtr connPtr;
size_t *bytesSent_;
size_t *bytesReceived_;
void sendReq(const HttpRequestPtr &req);
public:
Http1xTransport(trantor::TcpConnectionPtr connPtr,
size_t *bytesSent,
size_t *bytesReceived);
virtual ~Http1xTransport();
void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback) override;
void onRecvMessage(const trantor::TcpConnectionPtr &,
trantor::MsgBuffer *) override;
size_t requestsInFlight() const override
{
return pipeliningCallbacks_.size();
}
bool handleConnectionClose() override;
void onError(ReqResult result) override
{
while (!pipeliningCallbacks_.empty())
{
auto &cb = pipeliningCallbacks_.front().second;
cb(result, nullptr);
pipeliningCallbacks_.pop();
}
}
};
class HttpClientImpl final : public HttpClient, class HttpClientImpl final : public HttpClient,
public std::enable_shared_from_this<HttpClientImpl> public std::enable_shared_from_this<HttpClientImpl>
{ {
@ -39,13 +76,11 @@ class HttpClientImpl final : public HttpClient,
const trantor::InetAddress &addr, const trantor::InetAddress &addr,
bool useSSL = false, bool useSSL = false,
bool useOldTLS = false, bool useOldTLS = false,
bool validateCert = true, bool validateCert = true);
std::optional<Version> httpVersion = std::nullopt);
HttpClientImpl(trantor::EventLoop *loop, HttpClientImpl(trantor::EventLoop *loop,
const std::string &hostString, const std::string &hostString,
bool useOldTLS = false, bool useOldTLS = false,
bool validateCert = true, bool validateCert = true);
std::optional<Version> httpVersion = std::nullopt);
void sendRequest(const HttpRequestPtr &req, void sendRequest(const HttpRequestPtr &req,
const HttpReqCallback &callback, const HttpReqCallback &callback,
double timeout = 0) override; double timeout = 0) override;
@ -121,11 +156,6 @@ class HttpClientImpl final : public HttpClient,
sockOptCallback_ = std::move(cb); sockOptCallback_ = std::move(cb);
} }
std::optional<Version> protocolVersion() const override
{
return httpVersion_;
}
private: private:
std::shared_ptr<trantor::TcpClient> tcpClientPtr_; std::shared_ptr<trantor::TcpClient> tcpClientPtr_;
trantor::EventLoop *loop_; trantor::EventLoop *loop_;
@ -161,8 +191,6 @@ class HttpClientImpl final : public HttpClient,
std::string clientKeyPath_; std::string clientKeyPath_;
std::function<void(int)> sockOptCallback_; std::function<void(int)> sockOptCallback_;
std::unique_ptr<HttpTransport> transport_; std::unique_ptr<HttpTransport> transport_;
std::optional<Version> targetHttpVersion_;
std::optional<Version> httpVersion_;
}; };
using HttpClientImplPtr = std::shared_ptr<HttpClientImpl>; using HttpClientImplPtr = std::shared_ptr<HttpClientImpl>;

View File

@ -16,8 +16,6 @@
#include <drogon/UploadFile.h> #include <drogon/UploadFile.h>
#include <drogon/utils/Utilities.h> #include <drogon/utils/Utilities.h>
#include <fstream>
using namespace drogon; using namespace drogon;
HttpFileUploadRequest::HttpFileUploadRequest( HttpFileUploadRequest::HttpFileUploadRequest(
@ -27,74 +25,7 @@ HttpFileUploadRequest::HttpFileUploadRequest(
files_(files) files_(files)
{ {
setMethod(drogon::Post); setMethod(drogon::Post);
setVersion(drogon::Version::kHttp11);
setContentType("multipart/form-data; boundary=" + boundary_); setContentType("multipart/form-data; boundary=" + boundary_);
contentType_ = CT_MULTIPART_FORM_DATA; contentType_ = CT_MULTIPART_FORM_DATA;
} }
template <typename T>
void renderMultipart(const HttpFileUploadRequest *mPtr, T &content)
{
auto &boundary = mPtr->boundary();
for (auto &param : mPtr->getParameters())
{
content.append("--");
content.append(boundary);
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(param.first);
content.append("\"\r\n\r\n");
content.append(param.second);
content.append("\r\n");
}
for (auto &file : mPtr->files())
{
content.append("--");
content.append(boundary);
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(file.itemName());
content.append("\"; filename=\"");
content.append(file.fileName());
content.append("\"");
if (file.contentType() != CT_NONE)
{
content.append("\r\n");
auto &type = contentTypeToMime(file.contentType());
content.append("content-type: ");
content.append(type.data(), type.length());
}
content.append("\r\n\r\n");
std::ifstream infile(utils::toNativePath(file.path()),
std::ifstream::binary);
if (!infile)
{
LOG_ERROR << file.path() << " not found";
}
else
{
std::streambuf *pbuf = infile.rdbuf();
std::streamsize filesize = pbuf->pubseekoff(0, infile.end);
pbuf->pubseekoff(0, infile.beg); // rewind
std::string str;
str.resize(filesize);
pbuf->sgetn(&str[0], filesize);
content.append(std::move(str));
}
content.append("\r\n");
}
content.append("--");
content.append(boundary);
content.append("--");
}
void HttpFileUploadRequest::renderMultipartFormData(
trantor::MsgBuffer &buffer) const
{
renderMultipart(this, buffer);
}
void HttpFileUploadRequest::renderMultipartFormData(std::string &buffer) const
{
renderMultipart(this, buffer);
}

View File

@ -34,9 +34,6 @@ class HttpFileUploadRequest : public HttpRequestImpl
explicit HttpFileUploadRequest(const std::vector<UploadFile> &files); explicit HttpFileUploadRequest(const std::vector<UploadFile> &files);
void renderMultipartFormData(trantor::MsgBuffer &buffer) const;
void renderMultipartFormData(std::string &buffer) const;
private: private:
std::string boundary_; std::string boundary_;
std::vector<UploadFile> files_; std::vector<UploadFile> files_;

View File

@ -182,8 +182,7 @@ void HttpRequestImpl::parseParameters() const
} }
} }
void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output, void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
Version protoVer) const
{ {
switch (method_) switch (method_)
{ {
@ -267,11 +266,11 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output,
} }
output->append(" "); output->append(" ");
if (protoVer == Version::kHttp11) if (version_ == Version::kHttp11)
{ {
output->append("HTTP/1.1"); output->append("HTTP/1.1");
} }
else if (protoVer == Version::kHttp10) else if (version_ == Version::kHttp10)
{ {
output->append("HTTP/1.0"); output->append("HTTP/1.0");
} }
@ -286,7 +285,57 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output,
auto mReq = dynamic_cast<const HttpFileUploadRequest *>(this); auto mReq = dynamic_cast<const HttpFileUploadRequest *>(this);
if (mReq) if (mReq)
{ {
mReq->renderMultipartFormData(content); for (auto &param : mReq->getParameters())
{
content.append("--");
content.append(mReq->boundary());
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(param.first);
content.append("\"\r\n\r\n");
content.append(param.second);
content.append("\r\n");
}
for (auto &file : mReq->files())
{
content.append("--");
content.append(mReq->boundary());
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(file.itemName());
content.append("\"; filename=\"");
content.append(file.fileName());
content.append("\"");
if (file.contentType() != CT_NONE)
{
content.append("\r\n");
auto &type = contentTypeToMime(file.contentType());
content.append("content-type: ");
content.append(type.data(), type.length());
}
content.append("\r\n\r\n");
std::ifstream infile(utils::toNativePath(file.path()),
std::ifstream::binary);
if (!infile)
{
LOG_ERROR << file.path() << " not found";
}
else
{
std::streambuf *pbuf = infile.rdbuf();
std::streamsize filesize = pbuf->pubseekoff(0, infile.end);
pbuf->pubseekoff(0, infile.beg); // rewind
std::string str;
str.resize(filesize);
pbuf->sgetn(&str[0], filesize);
content.append(std::move(str));
}
content.append("\r\n");
}
content.append("--");
content.append(mReq->boundary());
content.append("--");
} }
} }
assert(!(!content.empty() && !content_.empty())); assert(!(!content.empty() && !content_.empty()));
@ -457,6 +506,7 @@ HttpRequestPtr HttpRequest::newHttpRequest()
{ {
auto req = std::make_shared<HttpRequestImpl>(nullptr); auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Get); req->setMethod(drogon::Get);
req->setVersion(drogon::Version::kHttp11);
return req; return req;
} }
@ -464,6 +514,7 @@ HttpRequestPtr HttpRequest::newHttpFormPostRequest()
{ {
auto req = std::make_shared<HttpRequestImpl>(nullptr); auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Post); req->setMethod(drogon::Post);
req->setVersion(drogon::Version::kHttp11);
req->contentType_ = CT_APPLICATION_X_FORM; req->contentType_ = CT_APPLICATION_X_FORM;
req->flagForParsingContentType_ = true; req->flagForParsingContentType_ = true;
return req; return req;
@ -489,6 +540,7 @@ HttpRequestPtr HttpRequest::newHttpJsonRequest(const Json::Value &data)
}); });
auto req = std::make_shared<HttpRequestImpl>(nullptr); auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Get); req->setMethod(drogon::Get);
req->setVersion(drogon::Version::kHttp11);
req->contentType_ = CT_APPLICATION_JSON; req->contentType_ = CT_APPLICATION_JSON;
req->setContent(writeString(builder, data)); req->setContent(writeString(builder, data));
req->flagForParsingContentType_ = true; req->flagForParsingContentType_ = true;
@ -547,10 +599,6 @@ const char *HttpRequestImpl::versionString() const
result = "HTTP/1.1"; result = "HTTP/1.1";
break; break;
case Version::kHttp2:
result = "HTTP/2";
break;
default: default:
break; break;
} }

View File

@ -87,6 +87,8 @@ class HttpRequestImpl : public HttpRequest
return loop_; return loop_;
} }
// [[deprcated("Now version is controlled by the HttpClient. Calling
// setVersion() will have no effect.")]]
void setVersion(Version v) void setVersion(Version v)
{ {
version_ = v; version_ = v;
@ -416,7 +418,7 @@ class HttpRequestImpl : public HttpRequest
return passThrough_; return passThrough_;
} }
void appendToBuffer(trantor::MsgBuffer *output, Version protoVer) const; void appendToBuffer(trantor::MsgBuffer *output) const;
const SessionPtr &session() const override const SessionPtr &session() const override
{ {

View File

@ -424,7 +424,7 @@ void WebSocketClientImpl::sendReq(const trantor::TcpConnectionPtr &connPtr)
trantor::MsgBuffer buffer; trantor::MsgBuffer buffer;
assert(upgradeRequest_); assert(upgradeRequest_);
auto implPtr = static_cast<HttpRequestImpl *>(upgradeRequest_.get()); auto implPtr = static_cast<HttpRequestImpl *>(upgradeRequest_.get());
implPtr->appendToBuffer(&buffer, Version::kHttp11); implPtr->appendToBuffer(&buffer);
LOG_TRACE << "Send request:" LOG_TRACE << "Send request:"
<< std::string(buffer.peek(), buffer.readableBytes()); << std::string(buffer.peek(), buffer.readableBytes());
connPtr->send(std::move(buffer)); connPtr->send(std::move(buffer));

View File

@ -1142,10 +1142,6 @@ void doTest(const HttpClientPtr &client, std::shared_ptr<test::Case> TEST_CTX)
{ {
FAIL("Unexpected exception, what(): " + std::string(e.what())); FAIL("Unexpected exception, what(): " + std::string(e.what()));
} }
// Had to be put here else there's a chance that the client hasn't
// negotiated the protocol yet
CHECK(client->protocolVersion() == Version::kHttp11);
}); });
#endif #endif
} }

View File

@ -1,15 +0,0 @@
diff --git a/lib/src/Http2Transport.cc b/lib/src/Http2Transport.cc
index f3f153e..11492b3 100644
--- a/lib/src/Http2Transport.cc
+++ b/lib/src/Http2Transport.cc
@@ -996,9 +996,8 @@ void Http2Transport::onRecvMessage(const trantor::TcpConnectionPtr &,
{
reconnectionIssued = true;
LOG_TRACE << "Reconnect due to running out of stream ids.";
- connPtr->getLoop()->queueInLoop([connPtr = this->connPtr]() {
- connPtr->shutdown();
- });
+ connPtr->getLoop()->queueInLoop(
+ [connPtr = this->connPtr]() { connPtr->shutdown(); });
}
}