Skip to content

Commit 49f7189

Browse files
committed
slop remove alloc in hot path
1 parent 5fdb6de commit 49f7189

12 files changed

Lines changed: 841 additions & 49 deletions

server/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ if (CORO_HTTP_SERVER_BUILD_TESTS)
6363
)
6464
add_test(NAME http_parser_test COMMAND http_parser_test)
6565

66+
add_executable(stack_future_test tests/stack_future_test.cpp)
67+
target_link_libraries(stack_future_test PRIVATE coro_http_server::coro_http_server)
68+
set_target_properties(stack_future_test PROPERTIES
69+
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/tests
70+
)
71+
add_test(NAME stack_future_test COMMAND stack_future_test)
72+
6673
add_executable(http_integration_test tests/http_integration_test.cpp)
6774
target_link_libraries(http_integration_test PRIVATE coro_http_server::coro_http_server Threads::Threads)
6875
set_target_properties(http_integration_test PROPERTIES

server/include/http_parser.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#pragma once
2-
#include "co_future.h"
2+
#include "non_owning_co_future.h"
33
#include "read_iterator.h"
44
#include "request_data.h"
55
#include <optional>
@@ -81,7 +81,7 @@ class HttpRequestParser {
8181

8282
public:
8383
HttpRequestParser(IOUring &ring, int fd);
84-
CoFuture<RequestReadStatus> ReadRequest(RequestData &request);
84+
NonOwningCoFuture<RequestReadStatus> ReadRequest(RequestData &request);
8585
void MarkContinueSent();
8686
};
8787

server/include/io_uring.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#pragma once
2-
#include "co_future.h"
2+
#include "owning_co_future.h"
33
#include <array>
44
#include <atomic>
55
#include <cstddef>
@@ -48,12 +48,12 @@ class IOUring {
4848
IOUring();
4949
IOUring &operator=(IOUring &&rhs);
5050
void Read(int fileDescriptor, std::array<char, kReadBufferSize> &buffer, std::function<void(int)> complete);
51-
CoFuture<int> ReadAsync(int fileDescriptor, std::array<char, kReadBufferSize> &buffer);
51+
OwningCoFuture<int> ReadAsync(int fileDescriptor, std::array<char, kReadBufferSize> &buffer);
5252
void Write(int fileDescriptor, std::string_view data, size_t offset, size_t len,
5353
std::function<void(int)> complete);
54-
CoFuture<int> WriteAsync(int fileDescriptor, std::string_view data, size_t offset,
54+
OwningCoFuture<int> WriteAsync(int fileDescriptor, std::string_view data, size_t offset,
5555
size_t len);
5656
void Accept(int fileDescriptor, std::function<void(int)> complete);
57-
CoFuture<int> AcceptAsync(int fileDescriptor);
57+
OwningCoFuture<int> AcceptAsync(int fileDescriptor);
5858
};
5959
}
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
#pragma once
2+
3+
#include "owning_co_future.h"
4+
#include <coroutine>
5+
#include <exception>
6+
#include <functional>
7+
#include <mutex>
8+
#include <stdexcept>
9+
#include <type_traits>
10+
#include <utility>
11+
12+
namespace HTTP {
13+
14+
template <typename T>
15+
class NonOwningCoFuture;
16+
17+
template <typename T>
18+
class NonOwningCoFutureCoroutinePromise {
19+
using ControlBlock = detail::StackFutureControl<T>;
20+
21+
ControlBlock control_;
22+
std::unique_lock<std::mutex> producerLock_{control_.mutex};
23+
bool completed_{false};
24+
25+
template <typename>
26+
friend class NonOwningCoFuture;
27+
28+
template <typename Fn>
29+
void StoreCompletion(Fn &&store) {
30+
if (completed_) {
31+
throw std::runtime_error("Future already satisfied");
32+
}
33+
std::forward<Fn>(store)();
34+
completed_ = true;
35+
}
36+
37+
void PublishCompletion() noexcept {
38+
if (!completed_) {
39+
StoreCompletion([this] {
40+
control_.exception =
41+
std::make_exception_ptr(std::runtime_error("Coroutine did not complete"));
42+
});
43+
}
44+
45+
control_.ready.store(true, std::memory_order_release);
46+
auto continuation =
47+
control_.continuation.exchange(nullptr, std::memory_order_acq_rel);
48+
producerLock_.unlock();
49+
50+
if (continuation) {
51+
(*continuation)();
52+
}
53+
}
54+
55+
public:
56+
NonOwningCoFuture<T> get_return_object() {
57+
return NonOwningCoFuture<T>(
58+
std::coroutine_handle<NonOwningCoFutureCoroutinePromise>::from_promise(
59+
*this));
60+
}
61+
62+
std::suspend_never initial_suspend() noexcept { return {}; }
63+
64+
struct FinalAwaiter {
65+
bool await_ready() noexcept { return false; }
66+
void await_resume() noexcept {}
67+
void await_suspend(
68+
std::coroutine_handle<NonOwningCoFutureCoroutinePromise> handle) noexcept {
69+
handle.promise().PublishCompletion();
70+
}
71+
};
72+
73+
FinalAwaiter final_suspend() noexcept { return {}; }
74+
75+
template <typename U>
76+
requires(!std::is_void_v<T>)
77+
void return_value(U &&value) {
78+
StoreCompletion([this, value = std::forward<U>(value)]() mutable {
79+
control_.value = std::move(value);
80+
});
81+
}
82+
83+
void unhandled_exception() {
84+
StoreCompletion([this] { control_.exception = std::current_exception(); });
85+
}
86+
};
87+
88+
template <>
89+
class NonOwningCoFutureCoroutinePromise<void> {
90+
using ControlBlock = detail::StackFutureControl<void>;
91+
92+
ControlBlock control_;
93+
std::unique_lock<std::mutex> producerLock_{control_.mutex};
94+
bool completed_{false};
95+
96+
template <typename>
97+
friend class NonOwningCoFuture;
98+
99+
template <typename Fn>
100+
void StoreCompletion(Fn &&store) {
101+
if (completed_) {
102+
throw std::runtime_error("Future already satisfied");
103+
}
104+
std::forward<Fn>(store)();
105+
completed_ = true;
106+
}
107+
108+
void PublishCompletion() noexcept {
109+
if (!completed_) {
110+
StoreCompletion([this] {
111+
control_.exception =
112+
std::make_exception_ptr(std::runtime_error("Coroutine did not complete"));
113+
});
114+
}
115+
116+
control_.ready.store(true, std::memory_order_release);
117+
auto continuation =
118+
control_.continuation.exchange(nullptr, std::memory_order_acq_rel);
119+
producerLock_.unlock();
120+
121+
if (continuation) {
122+
(*continuation)();
123+
}
124+
}
125+
126+
public:
127+
NonOwningCoFuture<void> get_return_object();
128+
129+
std::suspend_never initial_suspend() noexcept { return {}; }
130+
131+
struct FinalAwaiter {
132+
bool await_ready() noexcept { return false; }
133+
void await_resume() noexcept {}
134+
void await_suspend(
135+
std::coroutine_handle<NonOwningCoFutureCoroutinePromise> handle) noexcept {
136+
handle.promise().PublishCompletion();
137+
}
138+
};
139+
140+
FinalAwaiter final_suspend() noexcept { return {}; }
141+
142+
void return_void() { StoreCompletion([this] { control_.value = true; }); }
143+
144+
void unhandled_exception() {
145+
StoreCompletion([this] { control_.exception = std::current_exception(); });
146+
}
147+
};
148+
149+
template <typename T>
150+
class NonOwningCoFuture {
151+
using Promise = NonOwningCoFutureCoroutinePromise<T>;
152+
using Handle = std::coroutine_handle<Promise>;
153+
using ControlBlock = typename Promise::ControlBlock;
154+
155+
Handle handle_{};
156+
157+
explicit NonOwningCoFuture(Handle handle) : handle_(handle) {}
158+
159+
template <typename>
160+
friend class NonOwningCoFutureCoroutinePromise;
161+
162+
Promise *promise() const {
163+
if (!handle_) {
164+
throw std::logic_error("Invalid future");
165+
}
166+
return &handle_.promise();
167+
}
168+
169+
public:
170+
NonOwningCoFuture() = default;
171+
NonOwningCoFuture(const NonOwningCoFuture &) = delete;
172+
NonOwningCoFuture &operator=(const NonOwningCoFuture &) = delete;
173+
174+
NonOwningCoFuture(NonOwningCoFuture &&rhs) noexcept
175+
: handle_(std::exchange(rhs.handle_, {})) {}
176+
177+
NonOwningCoFuture &operator=(NonOwningCoFuture &&rhs) noexcept {
178+
if (this != &rhs) {
179+
if (handle_) {
180+
handle_.destroy();
181+
}
182+
handle_ = std::exchange(rhs.handle_, {});
183+
}
184+
return *this;
185+
}
186+
187+
~NonOwningCoFuture() {
188+
if (handle_) {
189+
handle_.destroy();
190+
}
191+
}
192+
193+
bool valid() const noexcept { return static_cast<bool>(handle_); }
194+
195+
bool isReady() const noexcept {
196+
if (!handle_) {
197+
return false;
198+
}
199+
return handle_.promise().control_.ready.load(std::memory_order_acquire);
200+
}
201+
202+
T get() {
203+
Promise *p = promise();
204+
ControlBlock &control = p->control_;
205+
if (!control.ready.load(std::memory_order_acquire)) {
206+
control.mutex.lock();
207+
control.mutex.unlock();
208+
}
209+
210+
if (control.exception) {
211+
std::rethrow_exception(control.exception);
212+
}
213+
if constexpr (!std::is_void_v<T>) {
214+
if (!control.value) {
215+
throw std::runtime_error("Future has no value");
216+
}
217+
return *control.value;
218+
}
219+
}
220+
221+
T Get() { return get(); }
222+
223+
bool await_ready() const noexcept { return isReady(); }
224+
225+
bool await_suspend(std::coroutine_handle<> awaiting) {
226+
ControlBlock &control = promise()->control_;
227+
auto callback = std::make_shared<typename ControlBlock::Callback>(
228+
[awaiting]() mutable {
229+
if (awaiting && !awaiting.done()) {
230+
awaiting.resume();
231+
}
232+
});
233+
234+
if (control.ready.load(std::memory_order_acquire)) {
235+
return false;
236+
}
237+
238+
std::shared_ptr<typename ControlBlock::Callback> expected = nullptr;
239+
if (!control.continuation.compare_exchange_strong(
240+
expected, callback, std::memory_order_acq_rel,
241+
std::memory_order_acquire)) {
242+
throw std::runtime_error("Tried to set the subscriber second time");
243+
}
244+
245+
if (control.ready.load(std::memory_order_acquire)) {
246+
expected = callback;
247+
if (control.continuation.compare_exchange_strong(
248+
expected, nullptr, std::memory_order_acq_rel,
249+
std::memory_order_acquire)) {
250+
return false;
251+
}
252+
}
253+
return true;
254+
}
255+
256+
T await_resume() { return get(); }
257+
258+
template <typename T1, typename U = T,
259+
typename = std::enable_if_t<!std::is_void_v<U>>>
260+
OwningCoFuture<T1> Then(std::function<T1(U)> &&then) {
261+
return OwningCoFuture<T1>(
262+
[this, thenFn = std::move(then)](OwningCoFuture<T1> &next) mutable {
263+
ControlBlock &control = promise()->control_;
264+
auto run = [this, &next, thenFn = std::move(thenFn)]() mutable {
265+
try {
266+
if constexpr (std::is_void_v<T1>) {
267+
thenFn(get());
268+
next.Set();
269+
} else {
270+
next.Set(thenFn(get()));
271+
}
272+
} catch (...) {
273+
next.SetException(std::current_exception());
274+
}
275+
};
276+
detail::RegisterContinuation(control, std::move(run));
277+
});
278+
}
279+
280+
template <typename T1, typename U = T,
281+
typename = std::enable_if_t<std::is_void_v<U>>, typename = void>
282+
OwningCoFuture<T1> Then(std::function<T1()> &&then) {
283+
return OwningCoFuture<T1>(
284+
[this, thenFn = std::move(then)](OwningCoFuture<T1> &next) mutable {
285+
ControlBlock &control = promise()->control_;
286+
auto run = [this, &next, thenFn = std::move(thenFn)]() mutable {
287+
try {
288+
get();
289+
if constexpr (std::is_void_v<T1>) {
290+
thenFn();
291+
next.Set();
292+
} else {
293+
next.Set(thenFn());
294+
}
295+
} catch (...) {
296+
next.SetException(std::current_exception());
297+
}
298+
};
299+
detail::RegisterContinuation(control, std::move(run));
300+
});
301+
}
302+
};
303+
304+
inline NonOwningCoFuture<void>
305+
NonOwningCoFutureCoroutinePromise<void>::get_return_object() {
306+
return NonOwningCoFuture<void>(
307+
std::coroutine_handle<NonOwningCoFutureCoroutinePromise>::from_promise(
308+
*this));
309+
}
310+
311+
} // namespace HTTP
312+
313+
template <typename T, typename... Args>
314+
struct std::coroutine_traits<HTTP::NonOwningCoFuture<T>, Args...> {
315+
using promise_type = HTTP::NonOwningCoFutureCoroutinePromise<T>;
316+
};

0 commit comments

Comments
 (0)