Compare commits

...

16 Commits

Author SHA1 Message Date
Martin Chang
e7eff323a6
fix failing on debug mode (#2079) 2024-06-20 20:52:14 +08:00
Martin Chang
88bf6f7fcb
Update ChangeLog.md 2024-06-18 14:38:36 +08:00
Martin Chang
80ff0a4869
Update ChangeLog.md 2024-06-18 14:36:45 +08:00
marty1885
70f1f8f38f Merge remote-tracking branch 'origin/master' into http2-beta 2024-06-18 14:25:49 +08:00
marty1885
29c8540aa8 Merge branch 'http2-beta' of github.com:drogonframework/drogon into http2-beta 2024-06-18 14:25:23 +08:00
Martin Chang
518bda2865
Fix HTTP/2 triggering RST_STREAM from google API servers when sending large (>64KB) of body (#2064)
* Fix underflow during flow tracking
* Better tracking of in-flight streaming that needs data sending to improve round robbing latency
* Proactively send and clear our HTTP/2 send buffer if there's already a lot of data queued. Reduces memory use.
* Fix calling callback multiple times if a stream received multiple RST_STREAM frame
2024-06-18 13:52:34 +08:00
marty1885
92b70f0d64 merge with master 2024-06-10 15:32:50 +08:00
marty1885
33344d09df Merge remote-tracking branch 'origin/master' into http2-beta 2024-06-02 12:45:51 +08:00
Martin Chang
62f83517f2
update eric-hpack-core (#2050) 2024-06-02 12:39:33 +08:00
Martin Chang
ed62f263da
merge with master (#2048) 2024-05-30 12:43:55 +08:00
Martin Chang
a742a8284c
Use "/" as default path in HTTP/2 client when req has no path set (#2045)
* fix sending empty path

* fix malformed path when path is empty but parameters exists
2024-05-30 09:24:54 +08:00
marty1885
e45e468040 merge with master 2024-05-27 22:37:29 +08:00
Martin Chang
fb08813e5c
Fix ChangeLog for beta release 2024-05-25 23:12:29 +08:00
marty1885
e81c3deb0b Merge branch 'http2-beta' of github.com:drogonframework/drogon into http2-beta 2024-05-25 23:09:18 +08:00
marty1885
605f3df734 update changelog for beta release 2024-05-25 23:08:11 +08:00
Martin Chang
50838a9fd3
HTTP/2 Client (#1554)
* Isolate HTTP/1.x client to it's own class

* track message sent and recv

* skeleton HTTP/2 frame parser

* format

* more skeleton for HTTP/2 parsing

* Fix dropping 1st request

* format, etc

* format again

* format yet again

* update trantor

* properly handler protocol errors

* non working: send http frames

* format

* hacky, but working request sending

* slight cleanup, parse h2 header

* parse data frames

* single resp callback called

* format

* initial populate response data and header

* fix not decompressing HTTP response

* workaround clang compile error

* Support serialized GoAwau

* cleanup parsing API

* fix clang compile warnings

* fix more clang warning

* fix msvc build

* optimize frame serialization

* some minor improvments

* enable handling of multiple streams

* release streamId on fail to decode header

* fix

* if content-length is present, cehck it matches the amount of data in DATA frame

* slight cleanup for error handling

* handle error

* handle response with no body

* handle out-of-order core frames

* reply to pings

* respect max concurrent streams

* handle CR LF in header

* send request body

* track window size

* fix MSVC build

* remove forced printing in debug

* update README

* Comply with RFC setup sequence

* Turn most error into trace, ignore unknown frames

* Fix MSVC warnings

* format

* sanitize max frame size settings

* cleanup connection killing

* Correctly keep track of RX window size

* send stream window update

* track TX window size

* politely close stream

* Fix large file download

* Fix RX flow control

* RFC compliant stream ID assignment

* handle connection close

* More compliant connection error handling

* Sementics cleanup

* slight optimization

* wip: store headers if we need more frames

* parse incomming headers in continuation frame

* fix untrue error message

* fix unable to post any data

* kill connection when PUSH_PROMIS recv

* able to parse RST_STREAM

* add check for DATA, HEADERS and PUSH_PROMISE frames

* react to RST_STREAM

* Handle sending CONTINUATION frame

* handle sending large body

* remove unneeded delay

* safer string parsing

* rfc compliant and overflow fix

* format

* round robbing sending pending body

* better ctor for frames

* update trantor

* correctly track intial RX/TX window

* fix minor ordering bugs

* get rid of duplicated serialization

* track total sent bytes

* allow unsupported frame type

* format

* mark response as HTTP/2

* buffered write

* remove bad warning

* error on hpack error

* pile of small fixes

* wip: let client decide the http version to sue

* some cleanup

* send correct http1 request based on client protocol version

* correctly respect HTTP1.x pipelining settings

* reconnect HTTP/2 if running out of stream ids

* isolate HTTP1 transport for client

* implement multipart support for HTTP/2

* revert client settings

* slight update

* fix build

* tmp: disable version check

* fix version check

* Fix fail in debug build

* fix debug code

* format

* code style cleanup

* correctly handle scheme in pasthrough mode

* format

* cleanup timeout handling logic

* don't close connection if using HTTP/2

* update trantor

* update function naming

* check 1st frame must be settings

* merge with master

* better round robbin

* fix spelling and update trantor

* format

* spelling

* avoid status code overflow

* fix integer overflow when data frame too small with padding

* fix windows

* fix bad dataframe parsing

* fix: Add support for query params for H2

* move from hpacker to eric-hpack

* format

* enable changing dynamic table size

* fix typo

* fix lint

* add license header

* get rid of included eric-hpack. Use submodule

* fix codespell

* fix format

* update eric-hpack

* update comment

* fix cmake

* replace example URL

* replace example URL to the old one

* use new eric-hpack-core

* use URL that does not redirect
2024-05-25 23:03:00 +08:00
23 changed files with 2687 additions and 239 deletions

3
.gitmodules vendored
View File

@ -2,3 +2,6 @@
path = trantor
url = https://github.com/an-tao/trantor.git
branch = master
[submodule "third_party/eric-hpack-core"]
path = third_party/eric-hpack-core
url = https://gitlab.com/joe1231231218/eric-hpack-core.git

View File

@ -244,6 +244,10 @@ if (BUILD_BROTLI)
endif (Brotli_FOUND)
endif (BUILD_BROTLI)
target_include_directories(
${PROJECT_NAME}
PRIVATE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/third_party/eric-hpack-core>)
set(DROGON_SOURCES
lib/src/AOPAdvice.cc
lib/src/AccessLogger.cc
@ -260,6 +264,8 @@ set(DROGON_SOURCES
lib/src/Hodor.cc
lib/src/HttpAppFrameworkImpl.cc
lib/src/HttpBinder.cc
lib/src/Http2Transport.cc
lib/src/Http1xTransport.cc
lib/src/HttpClientImpl.cc
lib/src/HttpConnectionLimit.cc
lib/src/HttpControllerBinder.cc
@ -296,7 +302,8 @@ set(DROGON_SOURCES
lib/src/WebSocketClientImpl.cc
lib/src/WebSocketConnectionImpl.cc
lib/src/YamlConfigAdapter.cc
lib/src/drogon_test.cc)
lib/src/drogon_test.cc
third_party/eric-hpack-core/hpack.cpp)
set(private_headers
lib/src/AOPAdvice.h
lib/src/CacheFile.h
@ -305,6 +312,8 @@ set(private_headers
lib/src/MiddlewaresFunction.h
lib/src/HttpAppFrameworkImpl.h
lib/src/HttpClientImpl.h
lib/src/Http2Transport.h
lib/src/Http1xTransport.h
lib/src/HttpConnectionLimit.h
lib/src/HttpControllerBinder.h
lib/src/HttpControllersRouter.h
@ -332,7 +341,8 @@ set(private_headers
lib/src/ConfigAdapterManager.h
lib/src/JsonConfigAdapter.h
lib/src/YamlConfigAdapter.h
lib/src/ConfigAdapter.h)
lib/src/ConfigAdapter.h
third_party/eric-hpack-core/hpack.h)
if (NOT WIN32)
set(DROGON_SOURCES

View File

@ -4,6 +4,44 @@ All notable changes to this project will be documented in this file.
## [Unreleased]
## [1.10.0-beta.2] - 2024-06-18
### Changes
* Proactively send buffered HTTP/2 data if send queue is too large
* Better tracking of in-flight streaming that has body to sent for lower latency
### Fixes
* Fix underflow on flow tracking when sending large body
* Fix calling callback multiple times if a stream received multiple RST_STREAM frame
## [1.10.0-beta.1] - 2024-06-02
## Fixes
* Fix bad request when request path is empty
* Fix bad encoding/decoding when the server requests HPACK buffer size change
## [1.10.0-beta.0] - 2024-05-25
### Changed
* Add HTTP/2 Client
## API changes
* New enum for HTTP/2 `Version::kHttp2`
* `HttpRequest::setVersion()` deprecated and has no affect.
* HTTP version in client is no longer controlled by `req->setVersion()` but with `newHttpClient(...., DESIRED_HTTP_VERSION)`
* It is more of a suggestion then requirement. The client negotiates and selects the highest supported one.
## Limitations
* No H2C (HTTP/2 over cleartext) support
* Technically supports handling server push. But no API to expose it to the user yet
* Does not support setting HTTP request dependency and priority
* Does not support trailers
## [1.9.5] - 2024-06-08
### API changes list
@ -1675,8 +1713,14 @@ All notable changes to this project will be documented in this file.
[Unreleased]: https://github.com/an-tao/drogon/compare/v1.9.5...HEAD
[1.10.0-beta.2]: https://github.com/drogonframework/drogon/compare/v1.10.0-beta.1...v1.10.0-beta.2
[1.10.0-beta.1]: https://github.com/drogonframework/drogon/compare/v1.10.0-beta.0...v1.10.0-beta.1
[1.9.5]: https://github.com/an-tao/drogon/compare/v1.9.4...v1.9.5
[1.10.0-beta.0]: https://github.com/an-tao/drogon/compare/v1.9.4...1.10.0-beta.0
[1.9.4]: https://github.com/an-tao/drogon/compare/v1.9.3...v1.9.4
[1.9.3]: https://github.com/an-tao/drogon/compare/v1.9.2...v1.9.3

View File

@ -14,7 +14,7 @@ Drogon is a cross-platform framework, It supports Linux, macOS, FreeBSD, OpenBSD
* Use a non-blocking I/O network lib based on epoll (kqueue under macOS/FreeBSD) to provide high-concurrency, high-performance network IO, please visit the [TFB Tests Results](https://www.techempower.com/benchmarks/#section=data-r19&hw=ph&test=composite) for more details;
* Provide a completely asynchronous programming mode;
* Support Http1.0/1.1 (server side and client side);
* Support HTTP/2 (and 1.0/1.1) client and HTTP 1.1/1.0 server
* Based on template, a simple reflection mechanism is implemented to completely decouple the main program framework, controllers and views.
* Support cookies and built-in sessions;
* Support back-end rendering, the controller generates the data to the view to generate the Html page. Views are described by CSP template files, C++ codes are embedded into Html pages through CSP tags. And the drogon command-line tool automatically generates the C++ code files for compilation;

View File

@ -184,6 +184,11 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* If this method is not called, the default depth value is 0 which means
* the pipelining is disabled. For details about pipelining, see
* rfc2616-8.1.2.2
*
* @param depth The depth value.
* @note This option is only valid for HTTP/1.x. If the client running in
* HTTP/2 mode, this settings have no effect. The maximum concurrent
* requests for HTTP/2 is 100 (unless the server has a different setting).
*/
virtual void setPipeliningDepth(size_t depth) = 0;
@ -238,15 +243,20 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* enabled for HTTPS.
* @param validateCert If the parameter is set to true, the client validates
* the server certificate when SSL handshaking.
* @param targetVersion The target TLS version to use for HTTPS connections.
* This is a mere hint, and the actual version used will depend on the
* server and the client's capabilities.
* @return HttpClientPtr The smart pointer to the new client object.
* @note: The ip parameter support for both ipv4 and ipv6 address
*/
static HttpClientPtr newHttpClient(const std::string &ip,
static HttpClientPtr newHttpClient(
const std::string &ip,
uint16_t port,
bool useSSL = false,
trantor::EventLoop *loop = nullptr,
bool useOldTLS = false,
bool validateCert = true);
bool validateCert = true,
std::optional<Version> targetVersion = std::nullopt);
/// Get the event loop of the client;
virtual trantor::EventLoop *getLoop() = 0;
@ -303,6 +313,16 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
const std::vector<std::pair<std::string, std::string>>
&sslConfCmds) = 0;
/**
* @brief get the protocol version used by the HTTP connection
* @return std::optional<Version> the protocol version used by the HTTP
*
* NOTE: It could return std::nullopt if the connection is not established
* or is still negotiating the protocol. This is IMPORTANT as the client
* COULD be lazy and not connecting until the first request arrives.
*/
virtual std::optional<Version> protocolVersion() const = 0;
/// Create a Http client using the hostString to connect to server
/**
*
@ -329,15 +349,25 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
* @param validateCert If the parameter is set to true, the client validates
* the server certificate when SSL handshaking.
*
* @param targetVersion The target HTTP version that the client will attempt
* to use. **THIS IS ONLY A HINT**. The server may choose to ignore this as
* the server may not support the requested version. The preference is as
* follows:
* HTTP/2 > HTTP/1.1 > HTTP/1.0
* Note that there is no way to auto-detect HTTP/1.0 servers, so it is only
* used if explicitly requested.
*
* @note Don't add path and parameters in hostString, the request path and
* parameters should be set in HttpRequestPtr when calling the sendRequest()
* method.
*
*/
static HttpClientPtr newHttpClient(const std::string &hostString,
static HttpClientPtr newHttpClient(
const std::string &hostString,
trantor::EventLoop *loop = nullptr,
bool useOldTLS = false,
bool validateCert = true);
bool validateCert = true,
std::optional<Version> targetVersion = std::nullopt);
virtual ~HttpClient()
{

View File

@ -262,6 +262,7 @@ class DROGON_EXPORT HttpRequest
/**
* kHttp10 means Http version is 1.0
* kHttp11 means Http version is 1.1
* kHttp20 means Http version is 2.0
*/
virtual Version version() const = 0;

View File

@ -328,6 +328,7 @@ class DROGON_EXPORT HttpResponse
/**
* kHttp10 means Http version is 1.0
* kHttp11 means Http version is 1.1
* kHttp2 means Http version is 2.0
*/
virtual Version version() const = 0;

View File

@ -89,11 +89,14 @@ enum HttpStatusCode
k511NetworkAuthenticationRequired = 511
};
// TODO: Add an option to use default HTTP version
// as the server may not support HTTP/2.
enum class Version
{
kUnknown = 0,
kHttp10,
kHttp11
kHttp11,
kHttp2,
};
enum ContentType

105
lib/src/Http1xTransport.cc Normal file
View File

@ -0,0 +1,105 @@
#include "Http1xTransport.h"
#include "HttpResponseParser.h"
using namespace drogon;
Http1xTransport::Http1xTransport(trantor::TcpConnectionPtr connPtr,
Version version,
size_t *bytesSent,
size_t *bytesReceived)
: connPtr(connPtr),
bytesSent_(bytesSent),
bytesReceived_(bytesReceived),
version_(version)
{
connPtr->setContext(std::make_shared<HttpResponseParser>(connPtr));
}
void Http1xTransport::sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback)
{
sendReq(req);
pipeliningCallbacks_.emplace(std::move(req), std::move(callback));
}
void Http1xTransport::onRecvMessage(const trantor::TcpConnectionPtr &conn,
trantor::MsgBuffer *msg)
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
assert(responseParser != nullptr);
assert(connPtr.get() == conn.get());
// LOG_TRACE << "###:" << msg->readableBytes();
auto msgSize = msg->readableBytes();
while (msg->readableBytes() > 0)
{
if (pipeliningCallbacks_.empty())
{
LOG_ERROR << "More responses than expected!";
connPtr->shutdown();
return;
}
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
if (!responseParser->parseResponse(msg))
{
*bytesReceived_ += (msgSize - msg->readableBytes());
errorCallback(ReqResult::BadResponse);
return;
}
if (responseParser->gotAll())
{
auto resp = responseParser->responseImpl();
responseParser->reset();
*bytesReceived_ += (msgSize - msg->readableBytes());
msgSize = msg->readableBytes();
respCallback(resp, std::move(firstReq), conn);
pipeliningCallbacks_.pop();
}
else
{
*bytesReceived_ += (msgSize - msg->readableBytes());
break;
}
}
}
Http1xTransport::~Http1xTransport()
{
}
bool Http1xTransport::handleConnectionClose()
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
if (responseParser && responseParser->parseResponseOnClose() &&
responseParser->gotAll())
{
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
auto resp = responseParser->responseImpl();
responseParser->reset();
respCallback(resp, std::move(firstReq), connPtr);
return false;
}
return true;
}
void Http1xTransport::sendReq(const HttpRequestPtr &req)
{
trantor::MsgBuffer buffer;
assert(req);
auto implPtr = static_cast<HttpRequestImpl *>(req.get());
assert(version_ == Version::kHttp10 || version_ == Version::kHttp11);
implPtr->appendToBuffer(&buffer, version_);
LOG_TRACE << "Send request:"
<< std::string_view(buffer.peek(), buffer.readableBytes());
*bytesSent_ += buffer.readableBytes();
connPtr->send(std::move(buffer));
}

52
lib/src/Http1xTransport.h Normal file
View File

@ -0,0 +1,52 @@
#pragma once
#include <trantor/net/EventLoop.h>
#include <trantor/net/TcpClient.h>
#include <list>
#include <queue>
#include <vector>
#include "HttpTransport.h"
namespace drogon
{
class Http1xTransport : public HttpTransport
{
private:
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> pipeliningCallbacks_;
trantor::TcpConnectionPtr connPtr;
size_t *bytesSent_;
size_t *bytesReceived_;
Version version_{Version::kHttp11};
void sendReq(const HttpRequestPtr &req);
public:
Http1xTransport(trantor::TcpConnectionPtr connPtr,
Version version,
size_t *bytesSent,
size_t *bytesReceived);
virtual ~Http1xTransport();
void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback) override;
void onRecvMessage(const trantor::TcpConnectionPtr &,
trantor::MsgBuffer *) override;
size_t requestsInFlight() const override
{
return pipeliningCallbacks_.size();
}
bool handleConnectionClose() override;
void onError(ReqResult result) override
{
while (!pipeliningCallbacks_.empty())
{
auto &cb = pipeliningCallbacks_.front().second;
cb(result, nullptr);
pipeliningCallbacks_.pop();
}
}
};
} // namespace drogon

1533
lib/src/Http2Transport.cc Normal file

File diff suppressed because it is too large Load Diff

553
lib/src/Http2Transport.h Normal file
View File

@ -0,0 +1,553 @@
#pragma once
#include "HttpTransport.h"
#include "HttpResponseImpl.h"
#include "hpack.h"
#include <cassert>
#include <string_view>
#include <variant>
#include <climits>
namespace drogon
{
using namespace EricHpack;
namespace internal
{
// Quick and dirty ByteStream implementation and extensions so we can use it
// to read from the buffer, safely. At least it checks for buffer overflows
// in debug mode.
struct ByteStream
{
ByteStream(uint8_t *ptr, size_t length) : ptr(ptr), length(length)
{
}
ByteStream(const trantor::MsgBuffer &buffer, size_t length)
: ptr((uint8_t *)buffer.peek()), length(length)
{
assert(length <= buffer.readableBytes());
}
uint32_t readU24BE()
{
assert(length >= 3 && offset <= length - 3);
uint32_t res =
ptr[offset] << 16 | ptr[offset + 1] << 8 | ptr[offset + 2];
offset += 3;
return res;
}
uint32_t readU32BE()
{
assert(length >= 4 && offset <= length - 4);
uint32_t res = ptr[offset] << 24 | ptr[offset + 1] << 16 |
ptr[offset + 2] << 8 | ptr[offset + 3];
offset += 4;
return res;
}
std::pair<bool, int32_t> readBI31BE()
{
assert(length >= 4 && offset <= length - 4);
int32_t res = ptr[offset] << 24 | ptr[offset + 1] << 16 |
ptr[offset + 2] << 8 | ptr[offset + 3];
offset += 4;
constexpr int32_t mask = 0x7fffffff;
bool flag = res & (~mask);
res &= mask;
return {flag, res};
}
uint16_t readU16BE()
{
assert(length >= 2 && offset <= length - 2);
uint16_t res = ptr[offset] << 8 | ptr[offset + 1];
offset += 2;
return res;
}
uint8_t readU8()
{
assert(length >= 1 && offset <= length - 1);
return ptr[offset++];
}
void read(uint8_t *buffer, size_t size)
{
assert((length >= size && offset <= length - size) || size == 0);
assert(buffer != nullptr);
assert(ptr != nullptr);
memcpy(buffer, ptr + offset, size);
offset += size;
}
void skip(size_t n)
{
assert((length >= n && offset <= length - n) || n == 0);
offset += n;
}
size_t size() const
{
return length;
}
size_t remaining() const
{
return length - offset;
}
protected:
uint8_t *ptr;
size_t length;
size_t offset = 0;
};
// DITTO but for serialization
struct OByteStream
{
void writeU24BE(uint32_t value)
{
assert(value <= 0xffffff);
value = htonl(value);
buffer.append((char *)&value + 1, 3);
}
void writeU32BE(uint32_t value)
{
value = htonl(value);
buffer.append((char *)&value, 4);
}
void writeU16BE(uint16_t value)
{
value = htons(value);
buffer.append((char *)&value, 2);
}
void writeU8(uint8_t value)
{
buffer.append((char *)&value, 1);
}
void pad(size_t size, uint8_t value = 0)
{
buffer.ensureWritableBytes(size);
auto ptr = (uint8_t *)buffer.peek() + buffer.readableBytes();
memset(ptr, value, size);
buffer.hasWritten(size);
}
void write(const uint8_t *ptr, size_t size)
{
buffer.append((char *)ptr, size);
}
void write(const std::string_view &str)
{
buffer.append(str.data(), str.size());
}
void overwriteU24BE(size_t offset, uint32_t value)
{
assert(value <= 0xffffff);
assert(offset <= buffer.readableBytes() - 3);
assert(buffer.writableBytes() >= 3);
auto ptr = (uint8_t *)buffer.peek() + offset;
ptr[0] = value >> 16;
ptr[1] = value >> 8;
ptr[2] = value;
}
void overwriteU8(size_t offset, uint8_t value)
{
assert(offset <= buffer.readableBytes() - 1);
assert(buffer.writableBytes() >= 1);
auto ptr = (uint8_t *)buffer.peek() + offset;
ptr[0] = value;
}
uint8_t *peek()
{
return (uint8_t *)buffer.peek();
}
size_t size() const
{
return buffer.readableBytes();
}
trantor::MsgBuffer buffer;
};
struct SettingsFrame
{
SettingsFrame() = default;
SettingsFrame(bool ack) : ack(ack)
{
}
bool ack = false;
std::vector<std::pair<uint16_t, uint32_t>> settings;
static std::optional<SettingsFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct WindowUpdateFrame
{
WindowUpdateFrame() = default;
WindowUpdateFrame(uint32_t windowSizeIncrement)
: windowSizeIncrement(windowSizeIncrement)
{
}
uint32_t windowSizeIncrement = 0;
static std::optional<WindowUpdateFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct HeadersFrame
{
HeadersFrame() = default;
HeadersFrame(std::vector<uint8_t> headerBlockFragment,
bool endHeaders,
bool endStream)
: headerBlockFragment(std::move(headerBlockFragment)),
endHeaders(endHeaders),
endStream(endStream)
{
}
HeadersFrame(const uint8_t *ptr,
size_t size,
bool endHeaders,
bool endStream)
: headerBlockFragment(ptr, ptr + size),
endHeaders(endHeaders),
endStream(endStream)
{
}
uint8_t padLength = 0;
bool exclusive = false;
uint32_t streamDependency = 0;
uint8_t weight = 0;
std::vector<uint8_t> headerBlockFragment;
bool endHeaders = false;
bool endStream = false;
static std::optional<HeadersFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct GoAwayFrame
{
GoAwayFrame() = default;
GoAwayFrame(uint32_t lastStreamId,
uint32_t errorCode,
const std::string &additionalDebugData)
: lastStreamId(lastStreamId),
errorCode(errorCode),
additionalDebugData((const uint8_t *)additionalDebugData.data(),
(const uint8_t *)additionalDebugData.data() +
additionalDebugData.size())
{
}
uint32_t lastStreamId = 0;
uint32_t errorCode = 0;
std::vector<uint8_t> additionalDebugData;
static std::optional<GoAwayFrame> parse(ByteStream &payload, uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct DataFrame
{
DataFrame() = default;
DataFrame(std::vector<uint8_t> data, bool endStream)
: data(std::move(data)), endStream(endStream)
{
}
DataFrame(const uint8_t *ptr, size_t size, bool endStream)
: data(std::vector<uint8_t>(ptr, ptr + size)), endStream(endStream)
{
}
explicit DataFrame(std::string_view data, bool endStream)
: data(data), endStream(endStream)
{
}
uint8_t padLength = 0;
std::variant<std::vector<uint8_t>, std::string_view> data;
bool endStream = false;
std::pair<const uint8_t *, size_t> getData() const
{
if (std::holds_alternative<std::vector<uint8_t>>(data))
{
auto &vec = std::get<std::vector<uint8_t>>(data);
return {vec.data(), vec.size()};
}
else
{
auto &str = std::get<std::string_view>(data);
return {(const uint8_t *)str.data(), str.size()};
}
}
static std::optional<DataFrame> parse(ByteStream &payload, uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct PingFrame
{
PingFrame() = default;
PingFrame(std::array<uint8_t, 8> opaqueData, bool ack)
: opaqueData(opaqueData), ack(ack)
{
}
std::array<uint8_t, 8> opaqueData;
bool ack = false;
static std::optional<PingFrame> parse(ByteStream &payload, uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct ContinuationFrame
{
ContinuationFrame() = default;
ContinuationFrame(std::vector<uint8_t> headerBlockFragment, bool endHeaders)
: headerBlockFragment(std::move(headerBlockFragment)),
endHeaders(endHeaders)
{
}
ContinuationFrame(const uint8_t *ptr, size_t size, bool endHeaders)
: headerBlockFragment(ptr, ptr + size), endHeaders(endHeaders)
{
}
std::vector<uint8_t> headerBlockFragment;
bool endHeaders = false;
static std::optional<ContinuationFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct RstStreamFrame
{
RstStreamFrame() = default;
RstStreamFrame(uint32_t errorCode) : errorCode(errorCode)
{
}
uint32_t errorCode = 0;
static std::optional<RstStreamFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
struct PushPromiseFrame
{
PushPromiseFrame() = default;
PushPromiseFrame(uint32_t promisedStreamId,
std::vector<uint8_t> headerBlockFragment,
bool endHeaders)
: promisedStreamId(promisedStreamId),
headerBlockFragment(std::move(headerBlockFragment)),
endHeaders(endHeaders)
{
}
uint8_t padLength = 0;
int32_t promisedStreamId = 0;
std::vector<uint8_t> headerBlockFragment;
bool endHeaders = false;
static std::optional<PushPromiseFrame> parse(ByteStream &payload,
uint8_t flags);
bool serialize(OByteStream &stream, uint8_t &flags) const;
};
using H2Frame = std::variant<SettingsFrame,
WindowUpdateFrame,
HeadersFrame,
GoAwayFrame,
DataFrame,
PingFrame,
ContinuationFrame,
PushPromiseFrame,
RstStreamFrame>;
enum class StreamState
{
SendingBody,
ExpectingHeaders,
ExpectingContinuation,
ExpectingData,
Finished,
};
// Virtual stream that holds properties for the HTTP/2 stream
// Defaults to stream 0 global properties
struct H2Stream
{
HttpReqCallback callback;
HttpResponseImplPtr response;
HttpRequestPtr request;
std::string body;
std::optional<size_t> contentLength;
int32_t streamId = 0;
size_t avaliableTxWindow = 65535;
size_t avaliableRxWindow = 65535;
StreamState state = StreamState::ExpectingHeaders;
trantor::MsgBuffer multipartData;
};
} // namespace internal
enum class StreamCloseErrorCode
{
NoError = 0x0,
ProtocolError = 0x1,
InternalError = 0x2,
FlowControlError = 0x3,
SettingsTimeout = 0x4,
StreamClosed = 0x5,
FrameSizeError = 0x6,
RefusedStream = 0x7,
Cancel = 0x8,
CompressionError = 0x9,
ConnectError = 0xa,
EnhanceYourCalm = 0xb,
InadequateSecurity = 0xc,
Http11Required = 0xd,
};
class Http2Transport : public HttpTransport
{
private:
// Implementation details, stuff we need to implement HTTP/2
trantor::TcpConnectionPtr connPtr;
size_t *bytesSent_;
size_t *bytesReceived_;
Hpack hpackTx;
Hpack hpackRx;
int32_t currentStreamId = 1;
std::unordered_map<int32_t, internal::H2Stream> streams;
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> bufferedRequests;
trantor::MsgBuffer headerBufferRx;
internal::OByteStream batchedSendBuffer;
int32_t expectngContinuationStreamId = 0;
std::map<int32_t, size_t> pendingDataSend;
std::optional<decltype(pendingDataSend)::iterator> currentDataSend;
bool reconnectionIssued = false;
bool firstSettingsReceived = false;
// HTTP/2 client-wide settings (can be changed by server)
size_t maxConcurrentStreams = 100;
size_t initialRxWindowSize = 65535;
size_t initialTxWindowSize = 65535;
size_t maxFrameSize = 16384;
size_t maxRxDynamicTableSize = 4096;
// Configuration settings
const uint32_t windowIncreaseThreshold = 16384;
const uint32_t windowIncreaseSize = 128 * 1024; // 128KB
const uint32_t maxCompressiedHeaderSize = 2048;
const int32_t streamIdReconnectThreshold = INT_MAX - 8192;
// HTTP/2 connection-wide state
size_t avaliableTxWindow = 65535;
size_t avaliableRxWindow = 65535;
internal::H2Stream &createStream(int32_t streamId);
void responseSuccess(internal::H2Stream &stream);
void streamErrored(int32_t streamId, ReqResult result);
std::optional<int32_t> nextStreamId()
{
// XXX: Technically UB. But no one actually uses 1's complement
if (currentStreamId < 0)
return std::nullopt;
int32_t streamId = currentStreamId;
currentStreamId += 2;
return streamId;
}
// Returns true when we SHOULD reconnect due to exhausting stream IDs.
// Doesn't mean we will. We will force a reconnect when we actually
// run out.
inline bool runningOutStreamId()
{
return currentStreamId > streamIdReconnectThreshold;
}
void handleFrameForStream(const internal::H2Frame &frame,
int32_t streamId,
uint8_t flags);
void connectionErrored(int32_t lastStreamId,
StreamCloseErrorCode errorCode,
std::string errorMsg = "");
bool parseAndApplyHeaders(internal::H2Stream &stream,
const void *data,
size_t len);
std::pair<size_t, bool> sendBodyForStream(internal::H2Stream &stream,
const void *data,
size_t size);
std::pair<size_t, bool> sendBodyForStream(internal::H2Stream &stream,
size_t offset);
void sendFrame(const internal::H2Frame &frame, int32_t streamId);
void sendBufferedData();
public:
Http2Transport(trantor::TcpConnectionPtr connPtr,
size_t *bytesSent,
size_t *bytesReceived);
void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback) override;
void onRecvMessage(const trantor::TcpConnectionPtr &,
trantor::MsgBuffer *) override;
size_t requestsInFlight() const override
{
return streams.size();
}
bool handleConnectionClose() override;
void onError(ReqResult result) override;
protected:
void onServerSettingsReceived(){};
};
} // namespace drogon

View File

@ -36,6 +36,7 @@ void HttpClientImpl::createTcpClient()
LOG_TRACE << "New TcpClient," << serverAddr_.toIpPort();
tcpClientPtr_ =
std::make_shared<trantor::TcpClient>(loop_, serverAddr_, "httpClient");
Version version = targetHttpVersion_.value_or(Version::kHttp2);
if (useSSL_ && utils::supportsTls())
{
@ -48,6 +49,8 @@ void HttpClientImpl::createTcpClient()
.setConfCmds(sslConfCmds_)
.setCertPath(clientCertPath_)
.setKeyPath(clientKeyPath_);
if (version == Version::kHttp2)
policy->setAlpnProtocols({"h2", "http/1.1"});
tcpClientPtr_->enableSSL(std::move(policy));
}
@ -60,53 +63,116 @@ void HttpClientImpl::createTcpClient()
if (thisPtr->sockOptCallback_)
thisPtr->sockOptCallback_(fd);
});
tcpClientPtr_->setConnectionCallback(
[weakPtr](const trantor::TcpConnectionPtr &connPtr) {
tcpClientPtr_->setConnectionCallback([weakPtr](
const trantor::TcpConnectionPtr
&connPtr) {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
if (connPtr->connected())
{
connPtr->setContext(
std::make_shared<HttpResponseParser>(connPtr));
// send request;
LOG_TRACE << "Connection established!";
while (thisPtr->pipeliningCallbacks_.size() <=
thisPtr->pipeliningDepth_ &&
!thisPtr->requestsBuffer_.empty())
auto protocol = connPtr->applicationProtocol();
if (protocol == "http/1.1")
{
thisPtr->sendReq(connPtr,
thisPtr->requestsBuffer_.front().first);
thisPtr->pipeliningCallbacks_.push(
std::move(thisPtr->requestsBuffer_.front()));
LOG_TRACE << "Select http/1.1 protocol";
thisPtr->transport_ =
std::make_unique<Http1xTransport>(connPtr,
Version::kHttp11,
&thisPtr->bytesSent_,
&thisPtr->bytesReceived_);
thisPtr->httpVersion_ = Version::kHttp11;
}
else if (protocol == "h2")
{
LOG_TRACE << "Select http/2 protocol";
thisPtr->transport_ =
std::make_unique<Http2Transport>(connPtr,
&thisPtr->bytesSent_,
&thisPtr->bytesReceived_);
thisPtr->httpVersion_ = Version::kHttp2;
}
else if (protocol.empty())
{
// Either we are not using TLS or the server does not support
// ALPN. Use HTTP/1.1 if not specified otherwise.
bool force1_0 = thisPtr->targetHttpVersion_.value_or(
Version::kUnknown) == Version::kHttp10;
auto version = force1_0 ? Version::kHttp10 : Version::kHttp11;
thisPtr->httpVersion_ = version;
thisPtr->transport_ =
std::make_unique<Http1xTransport>(connPtr,
version,
&thisPtr->bytesSent_,
&thisPtr->bytesReceived_);
}
else
{
LOG_ERROR << "Unknown protocol " << protocol
<< " selected by server for HTTP";
thisPtr->onError(ReqResult::BadResponse);
return;
}
assert(thisPtr->httpVersion_.has_value());
assert(thisPtr->transport_);
thisPtr->transport_->setRespCallback(
[weakPtr](const HttpResponseImplPtr &resp,
std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb,
const trantor::TcpConnectionPtr &connPtr) {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->handleResponse(resp, std::move(reqAndCb), connPtr);
});
thisPtr->transport_->setErrorCallback([weakPtr](ReqResult result) {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->onError(result);
});
size_t maxSendReq = (*(thisPtr->httpVersion_) == Version::kHttp2)
? size_t{0xfffffff}
: thisPtr->pipeliningDepth_;
if (maxSendReq == 0)
maxSendReq = 1;
while (!thisPtr->requestsBuffer_.empty() &&
thisPtr->transport_->requestsInFlight() < maxSendReq)
{
auto &reqAndCb = thisPtr->requestsBuffer_.front();
thisPtr->transport_->sendRequestInLoop(reqAndCb.first,
std::move(
reqAndCb.second));
thisPtr->requestsBuffer_.pop_front();
}
}
else
{
LOG_TRACE << "connection disconnect";
auto responseParser = connPtr->getContext<HttpResponseParser>();
if (responseParser && responseParser->parseResponseOnClose() &&
responseParser->gotAll())
// TODO: Make sure the sequence of handling is correct
bool isUnexpected = false;
if (thisPtr->transport_)
{
auto &firstReq = thisPtr->pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
isUnexpected = thisPtr->transport_->handleConnectionClose();
}
auto resp = responseParser->responseImpl();
responseParser->reset();
if (isUnexpected)
{
thisPtr->onError(ReqResult::NetworkFailure);
return;
}
// temporary fix of dead tcpClientPtr_
// TODO: fix HttpResponseParser when content-length absence
// TODO: HTTP/2 transport also relies on this behavior to reconnect
// when running out of stream IDs
thisPtr->tcpClientPtr_.reset();
thisPtr->handleResponse(resp, std::move(firstReq), connPtr);
if (!thisPtr->requestsBuffer_.empty())
{
thisPtr->createTcpClient();
}
return;
}
thisPtr->onError(ReqResult::NetworkFailure);
}
});
tcpClientPtr_->setConnectionErrorCallback([weakPtr]() {
@ -148,20 +214,26 @@ HttpClientImpl::HttpClientImpl(trantor::EventLoop *loop,
const trantor::InetAddress &addr,
bool useSSL,
bool useOldTLS,
bool validateCert)
bool validateCert,
std::optional<Version> targetVersion)
: loop_(loop),
serverAddr_(addr),
useSSL_(useSSL),
validateCert_(validateCert),
useOldTLS_(useOldTLS)
useOldTLS_(useOldTLS),
targetHttpVersion_(targetVersion)
{
}
HttpClientImpl::HttpClientImpl(trantor::EventLoop *loop,
const std::string &hostString,
bool useOldTLS,
bool validateCert)
: loop_(loop), validateCert_(validateCert), useOldTLS_(useOldTLS)
bool validateCert,
std::optional<Version> targetVersion)
: loop_(loop),
validateCert_(validateCert),
useOldTLS_(useOldTLS),
targetHttpVersion_(targetVersion)
{
auto lowerHost = hostString;
std::transform(lowerHost.begin(),
@ -319,13 +391,13 @@ void HttpClientImpl::sendRequestInLoop(const HttpRequestPtr &req,
shared_from_this(),
req);
loop_->runAfter(
timeout,
[weakCallbackBackPtr =
std::weak_ptr<RequestCallbackParams>(callbackParamsPtr)] {
// TODO: Cancel the timer when the request is finished.
loop_->runAfter(timeout,
[weakCallbackBackPtr = std::weak_ptr<RequestCallbackParams>(
callbackParamsPtr)] {
auto callbackParamsPtr = weakCallbackBackPtr.lock();
if (callbackParamsPtr != nullptr)
{
if (callbackParamsPtr == nullptr)
return;
auto &thisPtr = callbackParamsPtr->clientPtr;
if (callbackParamsPtr->timeoutFlag)
{
@ -345,8 +417,8 @@ void HttpClientImpl::sendRequestInLoop(const HttpRequestPtr &req,
}
}
(callbackParamsPtr->callback)(ReqResult::Timeout, nullptr);
}
(callbackParamsPtr->callback)(ReqResult::Timeout,
nullptr);
});
sendRequestInLoop(req,
[callbackParamsPtr](ReqResult r,
@ -355,6 +427,7 @@ void HttpClientImpl::sendRequestInLoop(const HttpRequestPtr &req,
{
return;
}
callbackParamsPtr->timeoutFlag = true;
(callbackParamsPtr->callback)(r, resp);
});
@ -422,10 +495,11 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
requestsBuffer_.push_back(
{req,
[thisPtr = shared_from_this(),
callbackPtr](ReqResult result, const HttpResponsePtr &response) {
callbackPtr =
std::move(callbackPtr)](ReqResult result,
const HttpResponsePtr &response) {
(*callbackPtr)(result, response);
}});
if (domain_.empty() || !isDomainName_)
{
// Valid ip address, no domain, connect directly
@ -436,8 +510,7 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
// No ip address and no domain, respond with BadServerAddress
else
{
requestsBuffer_.pop_front();
(*callbackPtr)(ReqResult::BadServerAddress, nullptr);
callback(ReqResult::BadServerAddress, nullptr);
assert(requestsBuffer_.empty());
}
return;
@ -504,19 +577,18 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
}});
return;
}
assert(transport_ != nullptr);
assert(httpVersion_.has_value());
size_t maxSendReq = (*httpVersion_ == Version::kHttp2) ? size_t{0xfffffffff}
: pipeliningDepth_;
if (maxSendReq == 0)
maxSendReq = 1;
// Connected, send request now
if (pipeliningCallbacks_.size() <= pipeliningDepth_ &&
requestsBuffer_.empty())
if (transport_->requestsInFlight() <= maxSendReq && requestsBuffer_.empty())
{
sendReq(connPtr, req);
pipeliningCallbacks_.push(
{req,
[thisPtr,
callback = std::move(callback)](ReqResult result,
const HttpResponsePtr &response) {
callback(result, response);
}});
transport_->sendRequestInLoop(req, std::move(callback));
}
else
{
@ -530,25 +602,11 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
}
}
void HttpClientImpl::sendReq(const trantor::TcpConnectionPtr &connPtr,
const HttpRequestPtr &req)
{
trantor::MsgBuffer buffer;
assert(req);
auto implPtr = static_cast<HttpRequestImpl *>(req.get());
implPtr->appendToBuffer(&buffer);
LOG_TRACE << "Send request:"
<< std::string(buffer.peek(), buffer.readableBytes());
bytesSent_ += buffer.readableBytes();
connPtr->send(std::move(buffer));
}
void HttpClientImpl::handleResponse(
const HttpResponseImplPtr &resp,
std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb,
const trantor::TcpConnectionPtr &connPtr)
{
assert(!pipeliningCallbacks_.empty());
auto &type = resp->getHeaderBy("content-type");
auto &coding = resp->getHeaderBy("content-encoding");
if (coding == "gzip")
@ -565,8 +623,8 @@ void HttpClientImpl::handleResponse(
{
resp->parseJson();
}
resp->setPeerCertificate(connPtr->peerCertificate());
auto cb = std::move(reqAndCb);
pipeliningCallbacks_.pop();
handleCookies(resp);
cb.second(ReqResult::Ok, resp);
@ -579,13 +637,16 @@ void HttpClientImpl::handleResponse(
if (!requestsBuffer_.empty())
{
auto &reqAndCallback = requestsBuffer_.front();
sendReq(connPtr, reqAndCallback.first);
pipeliningCallbacks_.push(std::move(reqAndCallback));
transport_->sendRequestInLoop(reqAndCallback.first,
std::move(reqAndCallback.second));
requestsBuffer_.pop_front();
}
else
{
if (resp->ifCloseConnection() && pipeliningCallbacks_.empty())
assert(httpVersion_.has_value());
if (resp->ifCloseConnection() &&
transport_->requestsInFlight() == 0 &&
*httpVersion_ != Version::kHttp2)
{
tcpClientPtr_.reset();
}
@ -593,56 +654,15 @@ void HttpClientImpl::handleResponse(
}
else
{
while (!pipeliningCallbacks_.empty())
{
auto cb = std::move(pipeliningCallbacks_.front());
pipeliningCallbacks_.pop();
cb.second(ReqResult::NetworkFailure, nullptr);
}
transport_->onError(ReqResult::NetworkFailure);
}
}
void HttpClientImpl::onRecvMessage(const trantor::TcpConnectionPtr &connPtr,
trantor::MsgBuffer *msg)
{
auto responseParser = connPtr->getContext<HttpResponseParser>();
// LOG_TRACE << "###:" << msg->readableBytes();
auto msgSize = msg->readableBytes();
while (msg->readableBytes() > 0)
{
if (pipeliningCallbacks_.empty())
{
LOG_ERROR << "More responses than expected!";
connPtr->shutdown();
return;
}
auto &firstReq = pipeliningCallbacks_.front();
if (firstReq.first->method() == Head)
{
responseParser->setForHeadMethod();
}
if (!responseParser->parseResponse(msg))
{
onError(ReqResult::BadResponse);
bytesReceived_ += (msgSize - msg->readableBytes());
return;
}
if (responseParser->gotAll())
{
auto resp = responseParser->responseImpl();
resp->setPeerCertificate(connPtr->peerCertificate());
responseParser->reset();
bytesReceived_ += (msgSize - msg->readableBytes());
msgSize = msg->readableBytes();
handleResponse(resp, std::move(firstReq), connPtr);
}
else
{
bytesReceived_ += (msgSize - msg->readableBytes());
break;
}
}
assert(transport_ != nullptr);
transport_->onRecvMessage(connPtr, msg);
}
HttpClientPtr HttpClient::newHttpClient(const std::string &ip,
@ -650,7 +670,8 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &ip,
bool useSSL,
trantor::EventLoop *loop,
bool useOldTLS,
bool validateCert)
bool validateCert,
std::optional<Version> targetVersion)
{
bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
return std::make_shared<HttpClientImpl>(
@ -658,29 +679,28 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &ip,
trantor::InetAddress(ip, port, isIpv6),
useSSL,
useOldTLS,
validateCert);
validateCert,
targetVersion);
}
HttpClientPtr HttpClient::newHttpClient(const std::string &hostString,
trantor::EventLoop *loop,
bool useOldTLS,
bool validateCert)
bool validateCert,
std::optional<Version> targetVersion)
{
return std::make_shared<HttpClientImpl>(
loop == nullptr ? HttpAppFrameworkImpl::instance().getLoop() : loop,
hostString,
useOldTLS,
validateCert);
validateCert,
targetVersion);
}
void HttpClientImpl::onError(ReqResult result)
{
while (!pipeliningCallbacks_.empty())
{
auto cb = std::move(pipeliningCallbacks_.front());
pipeliningCallbacks_.pop();
cb.second(result, nullptr);
}
if (transport_)
transport_->onError(result);
while (!requestsBuffer_.empty())
{
auto cb = std::move(requestsBuffer_.front().second);

View File

@ -24,9 +24,13 @@
#include <queue>
#include <vector>
#include "impl_forwards.h"
#include "Http2Transport.h"
#include "HttpTransport.h"
#include "Http1xTransport.h"
namespace drogon
{
class HttpClientImpl final : public HttpClient,
public std::enable_shared_from_this<HttpClientImpl>
{
@ -35,11 +39,13 @@ class HttpClientImpl final : public HttpClient,
const trantor::InetAddress &addr,
bool useSSL = false,
bool useOldTLS = false,
bool validateCert = true);
bool validateCert = true,
std::optional<Version> httpVersion = std::nullopt);
HttpClientImpl(trantor::EventLoop *loop,
const std::string &hostString,
bool useOldTLS = false,
bool validateCert = true);
bool validateCert = true,
std::optional<Version> httpVersion = std::nullopt);
void sendRequest(const HttpRequestPtr &req,
const HttpReqCallback &callback,
double timeout = 0) override;
@ -115,14 +121,17 @@ class HttpClientImpl final : public HttpClient,
sockOptCallback_ = std::move(cb);
}
std::optional<Version> protocolVersion() const override
{
return httpVersion_;
}
private:
std::shared_ptr<trantor::TcpClient> tcpClientPtr_;
trantor::EventLoop *loop_;
trantor::InetAddress serverAddr_;
bool useSSL_;
bool validateCert_;
void sendReq(const trantor::TcpConnectionPtr &connPtr,
const HttpRequestPtr &req);
void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback);
void sendRequestInLoop(const HttpRequestPtr &req,
@ -133,7 +142,6 @@ class HttpClientImpl final : public HttpClient,
std::pair<HttpRequestPtr, HttpReqCallback> &&reqAndCb,
const trantor::TcpConnectionPtr &connPtr);
void createTcpClient();
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> pipeliningCallbacks_;
std::list<std::pair<HttpRequestPtr, HttpReqCallback>> requestsBuffer_;
void onRecvMessage(const trantor::TcpConnectionPtr &, trantor::MsgBuffer *);
void onError(ReqResult result);
@ -152,6 +160,9 @@ class HttpClientImpl final : public HttpClient,
std::string clientCertPath_;
std::string clientKeyPath_;
std::function<void(int)> sockOptCallback_;
std::unique_ptr<HttpTransport> transport_;
std::optional<Version> targetHttpVersion_;
std::optional<Version> httpVersion_;
};
using HttpClientImplPtr = std::shared_ptr<HttpClientImpl>;

View File

@ -16,6 +16,8 @@
#include <drogon/UploadFile.h>
#include <drogon/utils/Utilities.h>
#include <fstream>
using namespace drogon;
HttpFileUploadRequest::HttpFileUploadRequest(
@ -25,7 +27,74 @@ HttpFileUploadRequest::HttpFileUploadRequest(
files_(files)
{
setMethod(drogon::Post);
setVersion(drogon::Version::kHttp11);
setContentType("multipart/form-data; boundary=" + boundary_);
contentType_ = CT_MULTIPART_FORM_DATA;
}
template <typename T>
void renderMultipart(const HttpFileUploadRequest *mPtr, T &content)
{
auto &boundary = mPtr->boundary();
for (auto &param : mPtr->getParameters())
{
content.append("--");
content.append(boundary);
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(param.first);
content.append("\"\r\n\r\n");
content.append(param.second);
content.append("\r\n");
}
for (auto &file : mPtr->files())
{
content.append("--");
content.append(boundary);
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(file.itemName());
content.append("\"; filename=\"");
content.append(file.fileName());
content.append("\"");
if (file.contentType() != CT_NONE)
{
content.append("\r\n");
auto &type = contentTypeToMime(file.contentType());
content.append("content-type: ");
content.append(type.data(), type.length());
}
content.append("\r\n\r\n");
std::ifstream infile(utils::toNativePath(file.path()),
std::ifstream::binary);
if (!infile)
{
LOG_ERROR << file.path() << " not found";
}
else
{
std::streambuf *pbuf = infile.rdbuf();
std::streamsize filesize = pbuf->pubseekoff(0, infile.end);
pbuf->pubseekoff(0, infile.beg); // rewind
std::string str;
str.resize(filesize);
pbuf->sgetn(&str[0], filesize);
content.append(std::move(str));
}
content.append("\r\n");
}
content.append("--");
content.append(boundary);
content.append("--");
}
void HttpFileUploadRequest::renderMultipartFormData(
trantor::MsgBuffer &buffer) const
{
renderMultipart(this, buffer);
}
void HttpFileUploadRequest::renderMultipartFormData(std::string &buffer) const
{
renderMultipart(this, buffer);
}

View File

@ -34,6 +34,9 @@ class HttpFileUploadRequest : public HttpRequestImpl
explicit HttpFileUploadRequest(const std::vector<UploadFile> &files);
void renderMultipartFormData(trantor::MsgBuffer &buffer) const;
void renderMultipartFormData(std::string &buffer) const;
private:
std::string boundary_;
std::vector<UploadFile> files_;

View File

@ -182,7 +182,8 @@ void HttpRequestImpl::parseParameters() const
}
}
void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output,
Version protoVer) const
{
switch (method_)
{
@ -266,11 +267,11 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
}
output->append(" ");
if (version_ == Version::kHttp11)
if (protoVer == Version::kHttp11)
{
output->append("HTTP/1.1");
}
else if (version_ == Version::kHttp10)
else if (protoVer == Version::kHttp10)
{
output->append("HTTP/1.0");
}
@ -285,57 +286,7 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
auto mReq = dynamic_cast<const HttpFileUploadRequest *>(this);
if (mReq)
{
for (auto &param : mReq->getParameters())
{
content.append("--");
content.append(mReq->boundary());
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(param.first);
content.append("\"\r\n\r\n");
content.append(param.second);
content.append("\r\n");
}
for (auto &file : mReq->files())
{
content.append("--");
content.append(mReq->boundary());
content.append("\r\n");
content.append("content-disposition: form-data; name=\"");
content.append(file.itemName());
content.append("\"; filename=\"");
content.append(file.fileName());
content.append("\"");
if (file.contentType() != CT_NONE)
{
content.append("\r\n");
auto &type = contentTypeToMime(file.contentType());
content.append("content-type: ");
content.append(type.data(), type.length());
}
content.append("\r\n\r\n");
std::ifstream infile(utils::toNativePath(file.path()),
std::ifstream::binary);
if (!infile)
{
LOG_ERROR << file.path() << " not found";
}
else
{
std::streambuf *pbuf = infile.rdbuf();
std::streamsize filesize = pbuf->pubseekoff(0, infile.end);
pbuf->pubseekoff(0, infile.beg); // rewind
std::string str;
str.resize(filesize);
pbuf->sgetn(&str[0], filesize);
content.append(std::move(str));
}
content.append("\r\n");
}
content.append("--");
content.append(mReq->boundary());
content.append("--");
mReq->renderMultipartFormData(content);
}
}
assert(!(!content.empty() && !content_.empty()));
@ -506,7 +457,6 @@ HttpRequestPtr HttpRequest::newHttpRequest()
{
auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Get);
req->setVersion(drogon::Version::kHttp11);
return req;
}
@ -514,7 +464,6 @@ HttpRequestPtr HttpRequest::newHttpFormPostRequest()
{
auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Post);
req->setVersion(drogon::Version::kHttp11);
req->contentType_ = CT_APPLICATION_X_FORM;
req->flagForParsingContentType_ = true;
return req;
@ -540,7 +489,6 @@ HttpRequestPtr HttpRequest::newHttpJsonRequest(const Json::Value &data)
});
auto req = std::make_shared<HttpRequestImpl>(nullptr);
req->setMethod(drogon::Get);
req->setVersion(drogon::Version::kHttp11);
req->contentType_ = CT_APPLICATION_JSON;
req->setContent(writeString(builder, data));
req->flagForParsingContentType_ = true;
@ -599,6 +547,10 @@ const char *HttpRequestImpl::versionString() const
result = "HTTP/1.1";
break;
case Version::kHttp2:
result = "HTTP/2";
break;
default:
break;
}

View File

@ -410,7 +410,7 @@ class HttpRequestImpl : public HttpRequest
return passThrough_;
}
void appendToBuffer(trantor::MsgBuffer *output) const;
void appendToBuffer(trantor::MsgBuffer *output, Version protoVer) const;
const SessionPtr &session() const override
{

View File

@ -97,6 +97,9 @@ const char *HttpResponseImpl::versionString() const
case Version::kHttp11:
result = "HTTP/1.1";
break;
case Version::kHttp2:
result = "HTTP/2";
break;
default:
break;

51
lib/src/HttpTransport.h Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include "HttpRequestImpl.h"
#include "HttpResponseImpl.h"
#include <drogon/drogon_callbacks.h>
#include <trantor/net/TcpConnection.h>
#include <trantor/utils/NonCopyable.h>
namespace drogon
{
class HttpTransport : public trantor::NonCopyable
{
public:
HttpTransport() = default;
virtual ~HttpTransport() = default;
virtual void sendRequestInLoop(const HttpRequestPtr &req,
HttpReqCallback &&callback) = 0;
virtual void onRecvMessage(const trantor::TcpConnectionPtr &,
trantor::MsgBuffer *) = 0;
virtual bool handleConnectionClose() = 0;
// XXX: Footgun: This MUST be called by the HttpClient. DO NOT
// call this within Transport. Client needs to know that error
// happened. When in doubt, call error callback.
virtual void onError(ReqResult result) = 0;
virtual size_t requestsInFlight() const = 0;
using RespCallback =
std::function<void(const HttpResponseImplPtr &,
std::pair<HttpRequestPtr, HttpReqCallback> &&,
const trantor::TcpConnectionPtr)>;
using ErrorCallback = std::function<void(ReqResult)>;
void setRespCallback(RespCallback cb)
{
respCallback = std::move(cb);
}
void setErrorCallback(ErrorCallback cb)
{
errorCallback = std::move(cb);
}
RespCallback respCallback;
ErrorCallback errorCallback;
};
} // namespace drogon

View File

@ -427,7 +427,7 @@ void WebSocketClientImpl::sendReq(const trantor::TcpConnectionPtr &connPtr)
trantor::MsgBuffer buffer;
assert(upgradeRequest_);
auto implPtr = static_cast<HttpRequestImpl *>(upgradeRequest_.get());
implPtr->appendToBuffer(&buffer);
implPtr->appendToBuffer(&buffer, Version::kHttp11);
LOG_TRACE << "Send request:"
<< std::string(buffer.peek(), buffer.readableBytes());
connPtr->send(std::move(buffer));

View File

@ -1162,6 +1162,9 @@ void doTest(const HttpClientPtr &client, std::shared_ptr<test::Case> TEST_CTX)
FAIL("Unexpected exception, what(): " + std::string(e.what()));
}
// Had to be put here else there's a chance that the client hasn't
// negotiated the protocol yet
CHECK(client->protocolVersion() == Version::kHttp11);
// Test coroutine middleware
try
{

1
third_party/eric-hpack-core vendored Submodule

@ -0,0 +1 @@
Subproject commit 5734893792f8e8b2984a91e845ff3e18b45e9c50