From 652859c7872e99e96583eb7adc5729eea7359d36 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Sun, 19 Jan 2025 19:57:24 +0100 Subject: [PATCH 1/7] sch: allow TaskScheduler destroy when having tasks Previously if a task was in the front queue, the scheduler's destruction would fail. Also it would fail if a currently executing task would submit a new one during scheduler destruction. It was quite unhandy really. Lets make the scheduler destructor wait until the scheduler gets empty. --- src/mg/sch/TaskScheduler.cpp | 34 ++++++++++++++++++------------ src/mg/sch/TaskScheduler.h | 13 +++++++++++- test/sch/UnitTestTaskScheduler.cpp | 23 ++++++++++++++++++++ 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index 47e51a2d..8eadfe38 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -68,11 +68,11 @@ namespace sch { mySignalFront.Send(); } - bool + TaskScheduleResult TaskScheduler::PrivSchedule() { if (myIsSchedulerWorking.ExchangeAcqRel(true)) - return false; + return TASK_SCHEDULE_BUSY; // Task status operations can all be relaxed inside the // scheduler. Syncing writes and reads between producers and @@ -87,11 +87,9 @@ namespace sch { uint64_t timestamp = mg::box::GetMilliseconds(); uint32_t batch; uint32_t maxBatch = mySchedBatchSize; + TaskScheduleResult result = TASK_SCHEDULE_DONE; retry: - if (PrivIsStopped()) - goto end; - // ------------------------------------------------------- // Handle waiting tasks. They are older than the ones in // the front queue, so must be handled first. @@ -215,11 +213,7 @@ namespace sch { // sleeping any moment. So the sched can't quit. It // must retry until either a front task appears, or // one of the waiting tasks' deadline is expired. - if (myQueueWaiting.Count() == 0) - { - mySignalFront.ReceiveBlocking(); - } - else + if (myQueueWaiting.Count() > 0) { deadline = myQueueWaiting.GetTop()->myDeadline; timestamp = mg::box::GetMilliseconds(); @@ -229,6 +223,18 @@ namespace sch { mg::box::TimeDuration(deadline - timestamp)); } } + else if (!PrivIsStopped()) + { + mySignalFront.ReceiveBlocking(); + } + else + { + // **Only** report scheduling is fully finished when there is actually + // nothing to do. That is, no tasks anywhere at all. Not in the front + // queue, not in the waiting queue, not in the ready queue. + result = TASK_SCHEDULE_FINISHED; + goto end; + } goto retry; } @@ -263,7 +269,7 @@ namespace sch { // and other workers will sleep on waiting for ready // tasks. PrivSignalReady(); - return true; + return result; } bool @@ -319,11 +325,13 @@ namespace sch { TaskScheduler::ourCurrent = myScheduler; uint64_t maxBatch = myScheduler->myExecBatchSize; uint64_t batch; - while (!myScheduler->PrivIsStopped()) + TaskScheduleResult result = TASK_SCHEDULE_DONE; + while (result != TASK_SCHEDULE_FINISHED) { do { - if (myScheduler->PrivSchedule()) + result = myScheduler->PrivSchedule(); + if (result != TASK_SCHEDULE_BUSY) myScheduleCount.IncrementRelaxed(); batch = 0; while (myScheduler->PrivExecute(myConsumer.Pop()) && ++batch < maxBatch); diff --git a/src/mg/sch/TaskScheduler.h b/src/mg/sch/TaskScheduler.h index a1e602d3..8c7935e0 100644 --- a/src/mg/sch/TaskScheduler.h +++ b/src/mg/sch/TaskScheduler.h @@ -49,6 +49,17 @@ namespace sch { class TaskSchedulerThread; + enum TaskScheduleResult + { + // Scheduling isn't done, another thread is doing it right now. + TASK_SCHEDULE_BUSY, + // Scheduling is done successfully. + TASK_SCHEDULE_DONE, + // Done successfully and was the last one. The scheduler is being stopped right + // now. + TASK_SCHEDULE_FINISHED, + }; + // Scheduler for asynchronous execution of tasks. Can be used // for tons of one-shot short-living tasks, as well as for // long-living periodic tasks with deadlines. @@ -109,7 +120,7 @@ namespace sch { void PrivPost( Task* aTask); - bool PrivSchedule(); + TaskScheduleResult PrivSchedule(); bool PrivExecute( Task* aTask); diff --git a/test/sch/UnitTestTaskScheduler.cpp b/test/sch/UnitTestTaskScheduler.cpp index 690de402..2609f676 100644 --- a/test/sch/UnitTestTaskScheduler.cpp +++ b/test/sch/UnitTestTaskScheduler.cpp @@ -108,6 +108,28 @@ namespace sch { mg::box::Sleep(1); } + static void + UnitTestTaskSchedulerDestroyWithFront() + { + TestCaseGuard guard("Destroy with front"); + + mg::box::AtomicU32 doneCount(0); + mg::sch::Task task2([&](mg::sch::Task*) { + doneCount.IncrementRelaxed(); + }); + mg::sch::Task task1([&](mg::sch::Task*) { + // Task posts another one. To cover the case how does the scheduler behave, + // when a new task was submitted after shutdown is started. + mg::sch::TaskScheduler::This().Post(&task2); + doneCount.IncrementRelaxed(); + }); + { + mg::sch::TaskScheduler sched("tst", 1, 5); + sched.Post(&task1); + } + TEST_CHECK(doneCount.LoadRelaxed() == 2); + } + static void UnitTestTaskSchedulerOrder() { @@ -1507,6 +1529,7 @@ namespace sch { TestSuiteGuard suite("TaskScheduler"); UnitTestTaskSchedulerBasic(); + UnitTestTaskSchedulerDestroyWithFront(); UnitTestTaskSchedulerOrder(); UnitTestTaskSchedulerDomino(); UnitTestTaskSchedulerWakeup(); From 43f019448152fbd4db6a4582048f611ab9aeae60 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Tue, 21 Jan 2025 00:18:52 +0100 Subject: [PATCH 2/7] aio: expose task's IOCore via a getter It is quite handy when one task needs to spawn another one in the same IOCore. Similar to TaskScheduler::This(). Especially for TCPServer which usually would spawn new TCPSocket/SSLSocket on each accept, inside the same IOCore. --- src/mg/aio/IOTask.h | 7 +++++++ src/mg/aio/TCPServer.h | 7 +++++++ src/mg/aio/TCPSocketIFace.h | 7 +++++++ test/aio/UnitTestTCPServer.cpp | 1 + test/aio/UnitTestTCPSocketIFace.cpp | 8 ++++++++ 5 files changed, 30 insertions(+) diff --git a/src/mg/aio/IOTask.h b/src/mg/aio/IOTask.h index b1ec2d73..ffb2f1ed 100755 --- a/src/mg/aio/IOTask.h +++ b/src/mg/aio/IOTask.h @@ -370,6 +370,7 @@ namespace aio { bool IsExpired() const; mg::net::Socket GetSocket() const; bool HasSocket() const; + IOCore& GetCore(); // Deadline is reset on each wakeup. Setting a new deadline works only if it is // lower than the previous deadline installed during the same task execution. That @@ -819,6 +820,12 @@ namespace aio { return GetSocket() != mg::net::theInvalidSocket; } + inline IOCore& + IOTask::GetCore() + { + return myCore; + } + inline void IOTask::SetDeadline( uint64_t aDeadline) diff --git a/src/mg/aio/TCPServer.h b/src/mg/aio/TCPServer.h index 3ff59e6e..5eef2aa5 100644 --- a/src/mg/aio/TCPServer.h +++ b/src/mg/aio/TCPServer.h @@ -48,6 +48,7 @@ namespace aio { void PostClose(); bool IsClosed() const; + IOCore& GetCore(); private: TCPServer(IOCore& aCore); @@ -64,5 +65,11 @@ namespace aio { IOServerSocket* myBoundSocket; }; + inline IOCore& + TCPServer::GetCore() + { + return myTask.GetCore(); + } + } } diff --git a/src/mg/aio/TCPSocketIFace.h b/src/mg/aio/TCPSocketIFace.h index 1a36ef6a..bbac828b 100644 --- a/src/mg/aio/TCPSocketIFace.h +++ b/src/mg/aio/TCPSocketIFace.h @@ -119,6 +119,7 @@ namespace aio { bool IsClosed() const; bool IsConnected() const; bool IsInWorkerNow() const; + IOCore& GetCore(); // // Not thread-safe functions. Can only be called from an IO worker thread. From @@ -243,6 +244,12 @@ namespace aio { TCPSocketCtl* myCtl; }; + inline IOCore& + TCPSocketIFace::GetCore() + { + return myTask.GetCore(); + } + inline void TCPSocketIFace::SetDeadline( uint64_t aDeadline) diff --git a/test/aio/UnitTestTCPServer.cpp b/test/aio/UnitTestTCPServer.cpp index 26e3d724..0e158e38 100644 --- a/test/aio/UnitTestTCPServer.cpp +++ b/test/aio/UnitTestTCPServer.cpp @@ -51,6 +51,7 @@ namespace tcpserver { TEST_CHECK(!server->IsClosed()); TEST_CHECK(server->Bind(host, err)); TEST_CHECK(!server->IsClosed()); + TEST_CHECK(&server->GetCore() == &core); server->PostClose(); // Any IPv4. diff --git a/test/aio/UnitTestTCPSocketIFace.cpp b/test/aio/UnitTestTCPSocketIFace.cpp index ae65a9fb..9825e0bd 100755 --- a/test/aio/UnitTestTCPSocketIFace.cpp +++ b/test/aio/UnitTestTCPSocketIFace.cpp @@ -333,6 +333,7 @@ namespace tcpsocketiface { TestMessage* Pop(); uint64_t GetWakeupCount(); + mg::aio::IOCore& GetCore(); mg::box::ErrorCode GetError(); mg::box::ErrorCode GetErrorConnect(); @@ -455,6 +456,7 @@ namespace tcpsocketiface { // Basic test to see if works at all. TestClientSocket client; client.PostConnectBlocking(aPort); + TEST_CHECK(&client.GetCore() == &theContext->myCore); client.SetAutoRecv(); client.Send(new TestMessage()); delete client.PopBlocking(); @@ -2532,6 +2534,12 @@ namespace tcpsocketiface { return myWakeupCount; } + mg::aio::IOCore& + TestClientSocket::GetCore() + { + return mySocket->GetCore(); + } + mg::box::ErrorCode TestClientSocket::GetError() { From 09a3585ffc4ef1fd18f6b33bffaaa71f111d1fa4 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Tue, 21 Jan 2025 00:23:13 +0100 Subject: [PATCH 3/7] build: fix installation cmake commands Some installation commands were outdated, like missing certain files or even having typos in the paths. Lets fix them before next commits start using the installation for examples and testing. Part of #21 --- src/mg/box/CMakeLists.txt | 2 ++ src/mg/net/CMakeLists.txt | 5 +++++ src/mg/sch/CMakeLists.txt | 2 +- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/mg/box/CMakeLists.txt b/src/mg/box/CMakeLists.txt index e0d9ed50..41f7cab3 100644 --- a/src/mg/box/CMakeLists.txt +++ b/src/mg/box/CMakeLists.txt @@ -59,8 +59,10 @@ set(install_headers RefCount.h SharedPtr.h Signal.h + StringFunctions.h Thread.h ThreadLocalPool.h + Time.h TypeTraits.h ) diff --git a/src/mg/net/CMakeLists.txt b/src/mg/net/CMakeLists.txt index 60758d51..5a5ebe1d 100644 --- a/src/mg/net/CMakeLists.txt +++ b/src/mg/net/CMakeLists.txt @@ -51,6 +51,11 @@ set(install_headers Buffer.h DomainToIP.h Host.h + Socket.h + SSL.h + SSLContext.h + SSLContextOpenSSL.h + SSLOpenSSL.h URL.h ) diff --git a/src/mg/sch/CMakeLists.txt b/src/mg/sch/CMakeLists.txt index 56d39cce..d069633e 100644 --- a/src/mg/sch/CMakeLists.txt +++ b/src/mg/sch/CMakeLists.txt @@ -19,4 +19,4 @@ set(install_headers ) install(TARGETS mgsch DESTINATION "${install_lib_root}") -install(FILES ${install_headers} DESTINATION "${install_include_root}/mg/sched/") +install(FILES ${install_headers} DESTINATION "${install_include_root}/mg/sch/") From 50a4b702d6f35f999375b0a572f647949c7217b1 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Thu, 23 Jan 2025 00:46:54 +0100 Subject: [PATCH 4/7] box: fix coroutines on Mac clang There was a problem that if a coroutine operation inherited mg::box::CoroOp and had its own members filled in its constructor, then those members would be nullified despite there was no any explicit change in the code to those members. Strangely, it was only happening in GitHub CI macos-latest, and only in one job (the other job in the same workflow was passing ok even more tricky tests). Eventually, debugging with prints right in CI has revealed that apparently the compiler in `co_await SomeOperation()` copies this operation. And if the operation has its copy/move constructors deleted, then the members are not copied. In most cases simply nullified. That looks like a compiler bug, because the copying was explicitly forbidden, and yet it happened. It was confirmed via prints - operation's address (`this`) was one in constructor and another in await_suspend(). Anyway, the fix seems to be just enabling the constructors. Unfortunately, it is not really testable. All the coro tests passed in macos CI and failed only in the future patch introducing examples testing. --- src/mg/box/Coro.h | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/mg/box/Coro.h b/src/mg/box/Coro.h index e40a8c50..d7664ff4 100644 --- a/src/mg/box/Coro.h +++ b/src/mg/box/Coro.h @@ -32,15 +32,26 @@ namespace box { ////////////////////////////////////////////////////////////////////////////////////// - // Base class for all awaitable operations. They are supposed to be created and - // destroyed right in the co_await expression. Hence not copyable nor assignable. struct CoroOp { - CoroOp() = default; - CoroOp( - const CoroOp&) = delete; - CoroOp& operator=( - const CoroOp&) = delete; + // Base class for all awaitable operations. A while ago it had copy and move + // constructors deleted. Indeed, they (ops) are supposed to be created and + // destroyed right in the co_await expression. Hence not copyable nor assignable. + // + // But this led to undefined behaviour on MacOS with clang compiler 15.0.7 + // under certain weird random conditions. Debugging has revealed, that when + // another operation inherits this one, adds some members, and fills them in its + // constructor, those members are nullified by the time await_suspend is called. + // This went away, when the child operation got copy/move constructors defined. + // + // Apparently, the compiler was/is copying the operation after its creation into + // the coroutine's context. Completely ignoring if this is even allowed. Moreover, + // if this is banned, then stuff gets nullified and "copied" anyway. + // + // Unfortunately, the only reproducer was in GitHub Actions CI on their + // macos-latest image (14.7.2 23H311). Hence it was decided to simply drop those + // deleted constructors and allow "copying". Perhaps in the future this can be + // brought back. }; struct CoroOpIsNotReady { static constexpr bool await_ready() noexcept { return false; } }; From bad1fe7ed612eabe6631df0bfa2f01595cf69c4c Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Thu, 23 Jan 2025 00:46:41 +0100 Subject: [PATCH 5/7] ci: rename test job build->test 'build' job name is not right when it is actually about tests. --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4e772bcd..dfe6d622 100755 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,7 +7,7 @@ on: branches: [ "master" ] jobs: - build: + test: runs-on: ${{ matrix.os }} strategy: From 7153c5ae4ca5bae65d6dfdc5974ba8fc5c6af023 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Fri, 10 Jan 2025 23:14:06 +0100 Subject: [PATCH 6/7] example: introduce examples The folder shows some usage examples of Serverbox. All of them use only the public Serverbox API. Serverbox for this is properly built and installed the same way as users would do. Closes #21 --- CMakeLists.txt | 9 +- examples/CMakeLists.txt | 70 +++ examples/README.md | 25 + examples/iocore_01_tcp_hello/CMakeLists.txt | 21 + examples/iocore_01_tcp_hello/main.cpp | 267 ++++++++++ examples/iocore_02_ssl_hello/CMakeLists.txt | 26 + examples/iocore_02_ssl_hello/Certs.cpp | 146 ++++++ examples/iocore_02_ssl_hello/Certs.h | 13 + examples/iocore_02_ssl_hello/main.cpp | 296 +++++++++++ examples/iocore_03_pipeline/CMakeLists.txt | 22 + examples/iocore_03_pipeline/main.cpp | 460 ++++++++++++++++++ .../iocore_04_tcp_periodic/CMakeLists.txt | 21 + examples/iocore_04_tcp_periodic/main.cpp | 344 +++++++++++++ .../scheduler_01_simple_task/CMakeLists.txt | 14 + examples/scheduler_01_simple_task/main.cpp | 22 + .../CMakeLists.txt | 14 + examples/scheduler_02_coroutine_task/main.cpp | 56 +++ .../CMakeLists.txt | 14 + examples/scheduler_03_multistep_task/main.cpp | 88 ++++ .../CMakeLists.txt | 14 + .../scheduler_04_interacting_tasks/main.cpp | 92 ++++ src/mg/stub/BoxStub.cpp | 3 + 22 files changed, 2034 insertions(+), 3 deletions(-) create mode 100644 examples/CMakeLists.txt create mode 100644 examples/README.md create mode 100644 examples/iocore_01_tcp_hello/CMakeLists.txt create mode 100644 examples/iocore_01_tcp_hello/main.cpp create mode 100644 examples/iocore_02_ssl_hello/CMakeLists.txt create mode 100644 examples/iocore_02_ssl_hello/Certs.cpp create mode 100644 examples/iocore_02_ssl_hello/Certs.h create mode 100644 examples/iocore_02_ssl_hello/main.cpp create mode 100644 examples/iocore_03_pipeline/CMakeLists.txt create mode 100644 examples/iocore_03_pipeline/main.cpp create mode 100644 examples/iocore_04_tcp_periodic/CMakeLists.txt create mode 100644 examples/iocore_04_tcp_periodic/main.cpp create mode 100644 examples/scheduler_01_simple_task/CMakeLists.txt create mode 100644 examples/scheduler_01_simple_task/main.cpp create mode 100644 examples/scheduler_02_coroutine_task/CMakeLists.txt create mode 100644 examples/scheduler_02_coroutine_task/main.cpp create mode 100644 examples/scheduler_03_multistep_task/CMakeLists.txt create mode 100644 examples/scheduler_03_multistep_task/main.cpp create mode 100644 examples/scheduler_04_interacting_tasks/CMakeLists.txt create mode 100644 examples/scheduler_04_interacting_tasks/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c53a63c4..1de71971 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,6 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") -Wall -Wextra -Wpedantic -Werror -Wno-unknown-warning-option -Wunused-function -Wno-invalid-offsetof -Wno-unused-value -Wno-deprecated-copy -Wno-gnu-zero-variadic-macro-arguments - -fno-rtti ) set(CMAKE_C_FLAGS ${CMAKE_C_FLAGS} "-pthread") @@ -42,5 +41,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") endif () add_subdirectory(src) -add_subdirectory(test) -add_subdirectory(bench) +if (NOT MG_SKIP_TEST) + add_subdirectory(test) +endif() +if (NOT MG_SKIP_BENCHES) + add_subdirectory(bench) +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 00000000..b203f3f0 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,70 @@ +cmake_minimum_required (VERSION 3.8) + +project("Examples") + +if (NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Release) +endif() +message(STATUS "Build type ${CMAKE_BUILD_TYPE}") + +if (NOT DEFINED CMAKE_CXX_STANDARD) + message(STATUS "Using C++20 standard as default") + set(CMAKE_CXX_STANDARD 20) +else() + message(STATUS "Using C++${CMAKE_CXX_STANDARD} standard as explicitly requested") +endif() + +if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + add_compile_options( + -Wall -Wextra -Wpedantic -Werror -Wno-unknown-warning-option -Wunused-function + -Wno-invalid-offsetof -Wno-unused-value -Wno-deprecated-copy + -Wno-gnu-zero-variadic-macro-arguments + ) + + set(CMAKE_C_FLAGS ${CMAKE_C_FLAGS} "-pthread") + set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-pthread") +elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + add_compile_options( + # This is needed at least for correct macro handling. Default behaviour won't + # expand __VA_ARGS__ correctly. + /Zc:preprocessor + /WX /wd4266 /wd4324 /wd4355 /wd4365 /wd4458 /wd4514 /wd4548 /wd4625 /wd4626 + /wd4668 /wd4710 /wd4820 /wd5026 /wd5027 /wd5039 /wd5045 /wd5105 /wd5219 /wd26439 + /wd26800 + # It ignores 'break' and 'fallthrough' done via a macro which makes it annoying + # and pointless. + /wd5262 + # Info message about a function being inlined. + /wd4711 + ) +endif() + +set(MG_SERVERBOX_BUILD_DIR ${CMAKE_BINARY_DIR}/build_serverbox) +set(MG_SERVERBOX_DIR ${MG_SERVERBOX_BUILD_DIR}/installed) + +add_custom_target(install_serverbox + COMMAND ${CMAKE_COMMAND} + -S ${CMAKE_SOURCE_DIR}/.. + -B ${MG_SERVERBOX_BUILD_DIR} + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DMG_SKIP_BENCHES=1 + -DMG_SKIP_TEST=1 + -DCMAKE_INSTALL_PREFIX=${MG_SERVERBOX_DIR} + + COMMAND ${CMAKE_COMMAND} + --build ${MG_SERVERBOX_BUILD_DIR} --config ${CMAKE_BUILD_TYPE} -j + + COMMAND ${CMAKE_COMMAND} + --install ${MG_SERVERBOX_BUILD_DIR} --config ${CMAKE_BUILD_TYPE} + + COMMENT "Installing serverbox" +) + +add_subdirectory(iocore_01_tcp_hello) +add_subdirectory(iocore_02_ssl_hello) +add_subdirectory(iocore_03_pipeline) +add_subdirectory(iocore_04_tcp_periodic) +add_subdirectory(scheduler_01_simple_task) +add_subdirectory(scheduler_02_coroutine_task) +add_subdirectory(scheduler_03_multistep_task) +add_subdirectory(scheduler_04_interacting_tasks) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..780b26df --- /dev/null +++ b/examples/README.md @@ -0,0 +1,25 @@ +# Examples + +The folder provides series of examples how to use the most interesting parts of Serverbox. + +Each example can be built and run on its own, they only depend on Serverbox itself, and not on each other. + +They can be used as reference points when you want to try and build something of your own. + +The recommended way of reading them is firstly all `scheduler_*`, then all `iocore_*`. The sequence 01, 02, etc is not required, each example is self-sufficient build- and code-wise. But later examples might not explain some simple things already covered in the previous examples. + +## Running + +The recommended way to use them is this (on non-Windows): +```Bash +# Build them all. +mkdir -p build +cd build +cmake .. +make -j + +# Run any of them like this: +./scheduler_01_simple_task/scheduler_01_simple_task +``` + +On Windows platform they also compile and run, but one would have to run them via cmd/PowerShell/VisualStudio. diff --git a/examples/iocore_01_tcp_hello/CMakeLists.txt b/examples/iocore_01_tcp_hello/CMakeLists.txt new file mode 100644 index 00000000..6847e903 --- /dev/null +++ b/examples/iocore_01_tcp_hello/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(iocore_01_tcp_hello main.cpp) +add_dependencies(iocore_01_tcp_hello install_serverbox) + +target_include_directories(iocore_01_tcp_hello PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_01_tcp_hello PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_01_tcp_hello + ${libs} +) diff --git a/examples/iocore_01_tcp_hello/main.cpp b/examples/iocore_01_tcp_hello/main.cpp new file mode 100644 index 00000000..06eaeb18 --- /dev/null +++ b/examples/iocore_01_tcp_hello/main.cpp @@ -0,0 +1,267 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" + +#include + +// +// Example of how to use TCPSocket and TCPServer. The clients connect to the server, send +// a message, wait for any response as a receipt confirmation, and delete themselves. +// + +static const int theClientCount = 5; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyClient final + // The socket subscription has a number of callbacks notifying us about various TCP + // events like on-connect, on-recv, on-send, etc. All of them are optional, and are + // always executed in a single thread, never in parallel. This allows us not to use + // any mutexes for data used exclusively by those callbacks. + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + // Using SendRef(), because the string is in the constant data section. It is + // never deleted. Hence no need to copy it. + const char* msg = "hello handshake"; + mySock->SendRef(msg, mg::box::Strlen(msg) + 1); + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + MG_LOG_INFO("Client.OnRecv", "%d: got response", myID); + MG_BOX_ASSERT(aStream.GetReadSize() > 0); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = myInput.size(); + myInput.resize(oldSize + newSize); + aStream.ReadData(myInput.data() + oldSize, newSize); + // Zero character (string terminator) is the end of the message. Until it is + // received, the message is incomplete. + if (myInput.back() != 0) + { + mySock->Recv(128); + return; + } + MG_LOG_INFO("Peer.OnRecv", "%d: got '%s'", myID, myInput.data()); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + // Identical to TCPSocketSubscription, but provides TCP-server events, like on-accept. + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_02_ssl_hello/CMakeLists.txt b/examples/iocore_02_ssl_hello/CMakeLists.txt new file mode 100644 index 00000000..fede3a34 --- /dev/null +++ b/examples/iocore_02_ssl_hello/CMakeLists.txt @@ -0,0 +1,26 @@ +add_executable(iocore_02_ssl_hello main.cpp Certs.cpp) +add_dependencies(iocore_02_ssl_hello install_serverbox) + +find_package(OpenSSL REQUIRED) + +target_include_directories(iocore_02_ssl_hello PUBLIC + ${MG_SERVERBOX_DIR}/include + ${OPENSSL_INCLUDE_DIR} +) +target_link_directories(iocore_02_ssl_hello PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub + ${OPENSSL_SSL_LIBRARY} + ${OPENSSL_CRYPTO_LIBRARY} +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_02_ssl_hello + ${libs} +) diff --git a/examples/iocore_02_ssl_hello/Certs.cpp b/examples/iocore_02_ssl_hello/Certs.cpp new file mode 100644 index 00000000..51000efb --- /dev/null +++ b/examples/iocore_02_ssl_hello/Certs.cpp @@ -0,0 +1,146 @@ +#include "Certs.h" + +// These certificates are signed with a CA, not by themselves. +// +// Generation commands: +// +// 1) Generate local key of CA. +// +// openssl ecparam -out ca.key -name prime256v1 -genkey +// +// 2) Generate CA public cert. Be sure you specify a unique 'Common Name' field when +// prompted by the command line. +// +// openssl req -x509 -new -nodes -key ca.key -sha256 -days 36500 -out ca.crt +// +// 3) Generate server's own private key: +// +// openssl ecparam -out dom.key -name prime256v1 -genkey +// +// 4) Generate CSRs (request to create a cert for the server). Here 3 are generated to +// give our server 3 public certificates. It is useful for testing their switch. Be +// sure you specify a unique 'Common Name' field for each CSR when prompted by the +// command line. +// +// openssl req -new -key dom.key -out dom.csr +// +// 5) Generate server's public certificate by the CA key and the CSR. +// +// openssl x509 -req -in dom.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out dom.crt -days 36500 -sha256 +// +// 6) Create DER versions of these files: +// +// openssl x509 -inform pem -in dom.crt -outform der -out dom.crt.der1 +// openssl x509 -inform pem -in ca.crt -outform der -out ca.crt.der +// openssl ec -inform pem -in dom.key -outform der -out dom.key.der +// +// 7) Print ready-to-use arrays of bytes. +// +// xxd -i dom.crt.der +// xxd -i dom.key.der +// xxd -i ca.crt.der +// +// Note that here DER is only used as a binary representation. PEM can be used too. +// Serverbox API allows both formats. +// +const uint32_t theTestCertSize = 429; +const uint8_t theTestCert[] = { + 0x30, 0x82, 0x01, 0xa9, 0x30, 0x82, 0x01, 0x4f, 0x02, 0x14, 0x53, 0x53, + 0x89, 0x2d, 0x22, 0x6f, 0xaf, 0xe4, 0x1c, 0x1a, 0x00, 0xb4, 0x08, 0x6c, + 0xf2, 0x24, 0x58, 0xbf, 0x2e, 0x93, 0x30, 0x0a, 0x06, 0x08, 0x2a, 0x86, + 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x30, 0x53, 0x31, 0x0b, 0x30, 0x09, + 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x52, 0x55, 0x31, 0x0f, 0x30, + 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, + 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x07, 0x0c, + 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x10, 0x30, 0x0e, 0x06, + 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, + 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x03, 0x0c, 0x07, + 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x30, 0x20, 0x17, 0x0d, 0x32, + 0x32, 0x30, 0x36, 0x32, 0x32, 0x31, 0x34, 0x30, 0x33, 0x30, 0x35, 0x5a, + 0x18, 0x0f, 0x32, 0x31, 0x32, 0x32, 0x30, 0x35, 0x32, 0x39, 0x31, 0x34, + 0x30, 0x33, 0x30, 0x35, 0x5a, 0x30, 0x59, 0x31, 0x0b, 0x30, 0x09, 0x06, + 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x41, 0x55, 0x31, 0x13, 0x30, 0x11, + 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x0a, 0x53, 0x6f, 0x6d, 0x65, 0x2d, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x31, 0x21, 0x30, 0x1f, 0x06, 0x03, 0x55, + 0x04, 0x0a, 0x0c, 0x18, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x20, 0x57, 0x69, 0x64, 0x67, 0x69, 0x74, 0x73, 0x20, 0x50, 0x74, 0x79, + 0x20, 0x4c, 0x74, 0x64, 0x31, 0x12, 0x30, 0x10, 0x06, 0x03, 0x55, 0x04, + 0x03, 0x0c, 0x09, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x20, 0x31, + 0x30, 0x59, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, + 0x01, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07, 0x03, + 0x42, 0x00, 0x04, 0xde, 0x5a, 0x0e, 0x8e, 0x07, 0xdd, 0xe2, 0x85, 0xe5, + 0x04, 0x73, 0xfb, 0x74, 0x77, 0x83, 0xc6, 0xae, 0x9f, 0xf0, 0xa1, 0xf0, + 0xe6, 0xe2, 0xf1, 0xf8, 0x8e, 0x45, 0x19, 0xe5, 0x58, 0x78, 0x90, 0xf5, + 0x16, 0xeb, 0x99, 0x33, 0xe7, 0xff, 0xdf, 0x4e, 0x0f, 0xb7, 0x7b, 0xf8, + 0x25, 0x88, 0x79, 0xa5, 0xc9, 0xe5, 0x82, 0x83, 0x9d, 0xa3, 0x37, 0x56, + 0x59, 0x56, 0xbf, 0xd3, 0x41, 0x2a, 0xe8, 0x30, 0x0a, 0x06, 0x08, 0x2a, + 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x03, 0x48, 0x00, 0x30, 0x45, + 0x02, 0x20, 0x0e, 0xae, 0x29, 0x05, 0x03, 0x80, 0x07, 0x8f, 0x2c, 0x84, + 0x7f, 0x1c, 0xeb, 0xab, 0x5c, 0xd0, 0x74, 0x62, 0xb5, 0x87, 0x57, 0x24, + 0xcc, 0x7b, 0x25, 0xa0, 0x77, 0x09, 0xb9, 0x2f, 0x77, 0x13, 0x02, 0x21, + 0x00, 0xe1, 0x46, 0x88, 0x24, 0xcc, 0x79, 0xed, 0x5a, 0x18, 0x32, 0xac, + 0xe4, 0x55, 0x28, 0xc4, 0x94, 0xc9, 0xd4, 0xa0, 0xcb, 0xb3, 0x01, 0xfa, + 0x18, 0xe0, 0x69, 0xe6, 0x2c, 0x23, 0xbb, 0x10, 0xf4 +}; + +const uint32_t theTestKeySize = 121; +const uint8_t theTestKey[] = { + 0x30, 0x77, 0x02, 0x01, 0x01, 0x04, 0x20, 0x65, 0x01, 0x04, 0xcc, 0x6b, + 0x27, 0xb4, 0x85, 0x3d, 0x38, 0x78, 0xd7, 0x3a, 0x5a, 0x12, 0x16, 0xdd, + 0xe4, 0x8f, 0xa5, 0x82, 0x67, 0xba, 0x34, 0xf0, 0x32, 0x6f, 0xd4, 0x74, + 0x89, 0x83, 0x44, 0xa0, 0x0a, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, + 0x03, 0x01, 0x07, 0xa1, 0x44, 0x03, 0x42, 0x00, 0x04, 0xde, 0x5a, 0x0e, + 0x8e, 0x07, 0xdd, 0xe2, 0x85, 0xe5, 0x04, 0x73, 0xfb, 0x74, 0x77, 0x83, + 0xc6, 0xae, 0x9f, 0xf0, 0xa1, 0xf0, 0xe6, 0xe2, 0xf1, 0xf8, 0x8e, 0x45, + 0x19, 0xe5, 0x58, 0x78, 0x90, 0xf5, 0x16, 0xeb, 0x99, 0x33, 0xe7, 0xff, + 0xdf, 0x4e, 0x0f, 0xb7, 0x7b, 0xf8, 0x25, 0x88, 0x79, 0xa5, 0xc9, 0xe5, + 0x82, 0x83, 0x9d, 0xa3, 0x37, 0x56, 0x59, 0x56, 0xbf, 0xd3, 0x41, 0x2a, + 0xe8 +}; + +const uint32_t theTestCACertSize = 513; +const uint8_t theTestCACert[] = { + 0x30, 0x82, 0x01, 0xfd, 0x30, 0x82, 0x01, 0xa3, 0xa0, 0x03, 0x02, 0x01, + 0x02, 0x02, 0x14, 0x33, 0x0d, 0xc7, 0xa8, 0xff, 0xc4, 0x48, 0x65, 0x67, + 0x97, 0x6f, 0x95, 0x98, 0xed, 0xe0, 0xa6, 0xf9, 0x99, 0x21, 0x24, 0x30, + 0x0a, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x30, + 0x53, 0x31, 0x0b, 0x30, 0x09, 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, + 0x52, 0x55, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, + 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, + 0x03, 0x55, 0x04, 0x07, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, + 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, + 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, + 0x55, 0x04, 0x03, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, + 0x30, 0x20, 0x17, 0x0d, 0x32, 0x32, 0x30, 0x36, 0x32, 0x32, 0x31, 0x33, + 0x33, 0x39, 0x33, 0x33, 0x5a, 0x18, 0x0f, 0x32, 0x31, 0x32, 0x32, 0x30, + 0x35, 0x32, 0x39, 0x31, 0x33, 0x33, 0x39, 0x33, 0x33, 0x5a, 0x30, 0x53, + 0x31, 0x0b, 0x30, 0x09, 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x52, + 0x55, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x06, + 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, + 0x55, 0x04, 0x07, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, + 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, 0x62, + 0x69, 0x73, 0x6f, 0x66, 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, + 0x04, 0x03, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x30, + 0x59, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, 0x01, + 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07, 0x03, 0x42, + 0x00, 0x04, 0x0b, 0x3d, 0x49, 0xbd, 0x61, 0xd9, 0xe9, 0xd6, 0xbd, 0x49, + 0xbb, 0xb5, 0x2c, 0xb7, 0x7e, 0x0a, 0x05, 0xb1, 0xfe, 0x89, 0xb3, 0x1a, + 0x0d, 0xbf, 0xf4, 0x44, 0x8e, 0x48, 0x6a, 0xa5, 0xa5, 0x26, 0xe5, 0x0b, + 0x10, 0xcd, 0x2c, 0x29, 0xde, 0xd8, 0xc6, 0x4b, 0x89, 0xdf, 0x6c, 0xab, + 0x8f, 0xb7, 0x87, 0xc4, 0xf8, 0xba, 0x31, 0x7c, 0xbb, 0x96, 0x42, 0x8c, + 0xf3, 0xb9, 0x62, 0x1c, 0xd4, 0x21, 0xa3, 0x53, 0x30, 0x51, 0x30, 0x1d, + 0x06, 0x03, 0x55, 0x1d, 0x0e, 0x04, 0x16, 0x04, 0x14, 0x64, 0xe3, 0x03, + 0x4e, 0x60, 0xc2, 0xd6, 0x3b, 0xff, 0x8f, 0x01, 0x0e, 0xc3, 0xe8, 0xeb, + 0xc7, 0xba, 0x88, 0x56, 0x66, 0x30, 0x1f, 0x06, 0x03, 0x55, 0x1d, 0x23, + 0x04, 0x18, 0x30, 0x16, 0x80, 0x14, 0x64, 0xe3, 0x03, 0x4e, 0x60, 0xc2, + 0xd6, 0x3b, 0xff, 0x8f, 0x01, 0x0e, 0xc3, 0xe8, 0xeb, 0xc7, 0xba, 0x88, + 0x56, 0x66, 0x30, 0x0f, 0x06, 0x03, 0x55, 0x1d, 0x13, 0x01, 0x01, 0xff, + 0x04, 0x05, 0x30, 0x03, 0x01, 0x01, 0xff, 0x30, 0x0a, 0x06, 0x08, 0x2a, + 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x03, 0x48, 0x00, 0x30, 0x45, + 0x02, 0x20, 0x69, 0xd0, 0x77, 0x50, 0x50, 0x6a, 0x3c, 0x49, 0x5b, 0x7e, + 0x09, 0x6c, 0x40, 0xf3, 0xa3, 0xdf, 0x27, 0x08, 0xbc, 0xa6, 0x2c, 0x14, + 0xa8, 0xca, 0xbb, 0x14, 0x78, 0xb7, 0xe6, 0x2d, 0x73, 0x33, 0x02, 0x21, + 0x00, 0x97, 0x40, 0x67, 0x7c, 0xd3, 0x76, 0x30, 0x3f, 0xa2, 0x05, 0xb7, + 0x7e, 0x72, 0xed, 0xa8, 0x9b, 0x8b, 0x00, 0x32, 0x61, 0x2f, 0xaf, 0x7b, + 0x6c, 0x5b, 0xa7, 0x17, 0x90, 0x69, 0xa9, 0x2a, 0x5c +}; diff --git a/examples/iocore_02_ssl_hello/Certs.h b/examples/iocore_02_ssl_hello/Certs.h new file mode 100644 index 00000000..db11bdd4 --- /dev/null +++ b/examples/iocore_02_ssl_hello/Certs.h @@ -0,0 +1,13 @@ +#include "mg/box/Definitions.h" + +// Public certificate to use on server. +extern const uint32_t theTestCertSize; +extern const uint8_t theTestCert[]; + +// Private key to use on server. +extern const uint32_t theTestKeySize; +extern const uint8_t theTestKey[]; + +// Public certificate used to sign the server. Should be available to the clients. +extern const uint32_t theTestCACertSize; +extern const uint8_t theTestCACert[]; diff --git a/examples/iocore_02_ssl_hello/main.cpp b/examples/iocore_02_ssl_hello/main.cpp new file mode 100644 index 00000000..a774dcb9 --- /dev/null +++ b/examples/iocore_02_ssl_hello/main.cpp @@ -0,0 +1,296 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/SSLSocket.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" +#include "mg/net/SSLContext.h" + +#include "Certs.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A simple example how to use SSL in Serverbox. Here a few SSL clients connect to an SSL +// server, send a simple message, gets its receipt confirmation, and close themselves. +// +// The server here has a properly signed certificate, and the clients validate it. +// + +static const int theClientCount = 5; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// The client sends a handshake message, and closes itself when any response is received. +// +class MyClient final + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort, + mg::net::SSLContext* aSSL) + : myID(aID) + , mySock(new mg::aio::SSLSocket(aCore)) + { + mg::aio::SSLSocketParams sslParams; + sslParams.mySSL = aSSL; + mySock->Open(sslParams); + + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + const char* msg = "hello handshake"; + // Using SendRef(), because the string is in the constant data section. It is + // never deleted. Hence no need to copy it. + mySock->SendRef(msg, mg::box::Strlen(msg) + 1); + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + MG_LOG_INFO("Client.OnRecv", "%d: got response", myID); + MG_BOX_ASSERT(aStream.GetReadSize() > 0); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::SSLSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock, + mg::net::SSLContext* aSSL) + : myID(aID) + , mySock(new mg::aio::SSLSocket(aCore)) + { + mg::aio::SSLSocketParams sslParams; + sslParams.mySSL = aSSL; + mySock->Open(sslParams); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = myInput.size(); + myInput.resize(oldSize + newSize); + aStream.ReadData(myInput.data() + oldSize, newSize); + // Zero character (string terminator) is the end of the message. Until it is + // received, the message is incomplete. + if (myInput.back() != 0) + { + mySock->Recv(128); + return; + } + MG_LOG_INFO("Peer.OnRecv", "%d: got '%s'", myID, myInput.data()); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::SSLSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + , mySSL(mg::net::SSLContext::NewShared(true /* is server */)) + { + // This would normally be AddLocalCertFile(), because in most cases the SSL data + // is stored on disk. + bool ok = mySSL->AddLocalCert( + theTestCert, theTestCertSize, + theTestKey, theTestKeySize); + MG_BOX_ASSERT(ok); + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock, mySSL.GetPointer()); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; + mg::net::SSLContext::Ptr mySSL; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + mg::net::SSLContext::Ptr clientSSL = + mg::net::SSLContext::NewShared(false /* is server */); + // Usually SSL cert is a file. Then you can use AddRemoteCertFile() if it is something + // custom, or AddRemoteCertFromSystem() when need to talk to the open Internet and the + // system's certificates are enough. + clientSSL->AddRemoteCert(theTestCACert, theTestCACertSize); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port, clientSSL.GetPointer()); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_03_pipeline/CMakeLists.txt b/examples/iocore_03_pipeline/CMakeLists.txt new file mode 100644 index 00000000..6f876584 --- /dev/null +++ b/examples/iocore_03_pipeline/CMakeLists.txt @@ -0,0 +1,22 @@ +add_executable(iocore_03_pipeline main.cpp) +add_dependencies(iocore_03_pipeline install_serverbox) + +target_include_directories(iocore_03_pipeline PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_03_pipeline PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgsch + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_03_pipeline + ${libs} +) diff --git a/examples/iocore_03_pipeline/main.cpp b/examples/iocore_03_pipeline/main.cpp new file mode 100644 index 00000000..8e26beb9 --- /dev/null +++ b/examples/iocore_03_pipeline/main.cpp @@ -0,0 +1,460 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A very real example with a task as the main business logic request, and a networking +// client to do sub-requests of that task. +// +// In this example the business logic is math. Request comes to the app and wants some +// arith operations done. But the main request processor can't do that. Instead, it sends +// those math operations as sub-requests to a "remote backend" (our CalcServer in this +// case). +// +// A more realistic example: imagine a server processing save games of an online game. The +// server is stateless, doesn't store any games itself. Instead, for any request like +// "create a save game", "drop it", "update it" the app downloads the save game from a +// database, does the work, and uploads the result. It would involve 2 sub-requests to the +// database for one main request like "update a save game". +// + +////////////////////////////////////////////////////////////////////////////////////////// +// +// CalcClient asynchronously talks to a "remote" CalcServer in request-response manner. +// Each request on completion fires its completion-callback, where we can wake a task up, +// send it a signal, or do anything else. +// +struct CalcRequest +{ + char myOp; + int64_t myArg1; + int64_t myArg2; + std::function myOnComplete; + // Intrusive list link. + CalcRequest* myNext; +}; + +class CalcClient final + : private mg::aio::TCPSocketSubscription +{ +public: + CalcClient( + mg::aio::IOCore& aCore, + uint16_t aPort) + : mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + MG_LOG_INFO("Client", "start connecting"); + mySock->PostConnect(connParams, this); + } + + void + Delete() + { + // Don't just 'delete this'. The socket might be being handled in a worker thread + // right now. Better close the socket first. When this is done, we can safely + // delete this client in OnClose(). + mySock->PostClose(); + } + + void + Submit( + char aOp, + int64_t aArg1, + int64_t aArg2, + std::function&& aOnComplete) + { + MG_LOG_INFO("Client.Submit", "new request"); + CalcRequest* req = new CalcRequest(); + req->myOp = aOp; + req->myArg1 = aArg1; + req->myArg2 = aArg2; + req->myOnComplete = std::move(aOnComplete); + // Wakeup the socket only if the request is first in the queue. If it is not, the + // socket was already woken up anyway, and is going to be executed soon. + // Could also wakeup always, but it is just not needed. + if (myFrontQueue.Push(req)) + mySock->PostWakeup(); + } + +private: + ~CalcClient() final + { + // Doesn't matter in which order to delete the requests. Hence can use the + // non-ordered fast pop. + CalcRequest* head = myFrontQueue.PopAllFastReversed(); + while (head != nullptr) + { + CalcRequest* next = head->myNext; + delete head; + head = next; + } + while (!myQueue.IsEmpty()) + delete myQueue.PopFirst(); + } + + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", ""); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnEvent() final + { + CalcRequest* tail; + CalcRequest* head = myFrontQueue.PopAll(tail); + if (head == nullptr) + return; + + // Save the requests for later response handling. + myQueue.Append(head, tail); + // Send requests in bulk. + mg::net::BufferStream data; + while (head != nullptr) + { + MG_LOG_INFO("Client.OnEvent", "got request"); + // It is a bad idea to send integers like that, because the receiver might + // have a different byte order. In general, apps must encode numbers in some + // platform-agnostic way. Like always use Big Endian, or use a protocol like + // MesssagePack. Here it is omitted for simplicity. + data.WriteCopy(&head->myOp, sizeof(head->myOp)); + data.WriteCopy(&head->myArg1, sizeof(head->myArg1)); + data.WriteCopy(&head->myArg2, sizeof(head->myArg2)); + head = head->myNext; + } + mySock->SendRef(data.PopData()); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + // Responses in this case come in the same order as requests. + int64_t res = 0; + while (aStream.GetReadSize() >= sizeof(res)) + { + aStream.ReadData(&res, sizeof(res)); + CalcRequest* pos = myQueue.PopFirst(); + MG_BOX_ASSERT(pos != nullptr); + pos->myOnComplete(res); + delete pos; + } + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", ""); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + mg::aio::TCPSocket* mySock; + // Queue of sent requests waiting for their responses. + mg::box::ForwardList myQueue; + // Queue if requests submitted by other threads. Not yet sent to the network. + mg::box::MultiProducerQueueIntrusive myFrontQueue; +}; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// CalcServer and its peers. Can be running remotely. Serves the math requests. +// +class CalcPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + CalcPeer( + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~CalcPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", ""); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + char op; + int64_t arg1, arg2; + constexpr uint32_t msgSize = sizeof(op) + sizeof(arg1) + sizeof(arg2); + mg::net::BufferStream data; + while (aStream.GetReadSize() >= msgSize) + { + aStream.ReadData(&op, sizeof(op)); + aStream.ReadData(&arg1, sizeof(arg1)); + aStream.ReadData(&arg2, sizeof(arg2)); + int64_t res; + switch(op) + { + case '+': res = arg1 + arg2; break; + case '-': res = arg1 - arg2; break; + default: + MG_BOX_ASSERT(!"Unreachable"); + res = 0; + break; + } + data.WriteCopy(&res, sizeof(res)); + } + mySock->SendRef(data.PopData()); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", ""); + mySock->Delete(); + delete this; + } + + mg::aio::TCPSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class CalcServer final + : private mg::aio::TCPServerSubscription +{ +public: + CalcServer( + mg::aio::IOCore& aCore) + : myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + + void + Delete() + { + myServer->PostClose(); + } + +private: + ~CalcServer() = default; + + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new CalcPeer(myServer->GetCore(), aSock); + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + delete this; + } + + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// The business logic request, which is doing sub-requests to the remote CalcServer. +// +class MyRequest +{ +public: + MyRequest( + int aID, + CalcClient& aCalcClient, + mg::sch::TaskScheduler& aSched) + : myID(aID) + , myCalcClient(aCalcClient) + , myTask(Execute(this)) + { + aSched.Post(&myTask); + } + +private: + mg::box::Coro + Execute( + MyRequest* aSelf) + { + MG_LOG_INFO("MyRequest.Execute", "%d: start", myID); + + // Send a first math request. + MG_LOG_INFO("MyRequest.Execute", "%d: send sub-request 1", myID); + int64_t res = 0; + myCalcClient.Submit('+', 10, myID, [aSelf, &res](int64_t aRes) { + res = aRes; + // The completion callback sends a signal to the MyRequest's task which also + // wakes it up. + aSelf->myTask.PostSignal(); + }); + // Wait for the signal in a yielding loop, to handle potential spurious wakeups. + while (!co_await aSelf->myTask.AsyncReceiveSignal()); + // + MG_LOG_INFO("MyRequest.Execute", "%d: got response %lld", myID, (long long)res); + + // Do it all again with other data. + MG_LOG_INFO("MyRequest.Execute", "%d: send sub-request 2", myID); + res = 0; + myCalcClient.Submit('-', 100, myID, [aSelf, &res](int64_t aRes) { + res = aRes; + aSelf->myTask.PostSignal(); + }); + while (!co_await aSelf->myTask.AsyncReceiveSignal()); + MG_LOG_INFO("MyRequest.Execute", "%d: got response %lld", myID, (long long)res); + + // 'delete this' + co_return wouldn't work here. Because deletion of the self + // would destroy the C++ coroutine object. co_return would then fail with + // use-after-free. Also can't use myTask.AsyncExitDelete(), because it would + // delete the myTask member, not the whole MyRequest, and would crash. For this + // case there is a special handler AsyncExitExec() - can put here any termination + // logic. + co_await aSelf->myTask.AsyncExitExec([this](mg::sch::Task*) { + delete this; + }); + MG_BOX_ASSERT(!"Unreachable"); + } + + // Can add here any other data needed by the request. Moreover, things like clients to + // other remote systems (myCalcClient) typically would be either global singletons, or + // would be wrapped into a bigger struct having all such clients. + const int myID; + CalcClient& myCalcClient; + mg::sch::Task myTask; +}; + +static void +ServeRequests( + CalcClient& aClient) +{ + // Typically here we would start a server, listen for incoming connections, receive + // the requests, create MyRequest for each, and submit them to the scheduler. But in + // this case it is simplified to just a couple of hardcoded MyRequests. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + MG_LOG_INFO("ServeRequests", "got a couple of complex requests"); + new MyRequest(1, aClient, scheduler); + new MyRequest(2, aClient, scheduler); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(1 /* threads */); + + MG_LOG_INFO("Main", "start calc server"); + CalcServer* calcServer = new CalcServer(core); + uint16_t port = calcServer->Bind(); + calcServer->Start(); + + MG_LOG_INFO("Main", "start calc client"); + CalcClient *calcClient = new CalcClient(core, port); + + ServeRequests(*calcClient); + + MG_LOG_INFO("Main", "terminating"); + calcClient->Delete(); + calcServer->Delete(); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_04_tcp_periodic/CMakeLists.txt b/examples/iocore_04_tcp_periodic/CMakeLists.txt new file mode 100644 index 00000000..7dfd44be --- /dev/null +++ b/examples/iocore_04_tcp_periodic/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(iocore_04_tcp_periodic main.cpp) +add_dependencies(iocore_04_tcp_periodic install_serverbox) + +target_include_directories(iocore_04_tcp_periodic PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_04_tcp_periodic PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_04_tcp_periodic + ${libs} +) diff --git a/examples/iocore_04_tcp_periodic/main.cpp b/examples/iocore_04_tcp_periodic/main.cpp new file mode 100644 index 00000000..8a3076f9 --- /dev/null +++ b/examples/iocore_04_tcp_periodic/main.cpp @@ -0,0 +1,344 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// An untrivial example how to use deadlines with IOCore. I.e. make a socket wakeup on a +// deadline or a delay if nothing is happening with it for a while. +// +// With this it becomes very simple and cheap to implement request timeouts. Normally it +// would look like a client has a bunch of requests in fly, waiting for their responses. +// Each request could have a different deadline (time_now + timeout). Then your code could +// store those requests in a binary heap (mg::box::BinaryHeap) sorted by the deadline. +// +// OnEvent() of your socket you would do socket->SetDeadline(heap.Top()->myDeadline) or +// something like that. Also in OnEvent() you check socket->IsExpired(). If yes, then you +// get the current timestamp, and pop from the heap all the requests whose deadline is +// less than the current time. They are expired, do anything with them. +// + +static const int theClientCount = 5; +static const int theMessageCount = 6; + +static bool +ReadData( + mg::net::BufferReadStream& aStream, + std::vector& aDst) +{ + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = aDst.size(); + aDst.resize(oldSize + newSize); + aStream.ReadData(aDst.data() + oldSize, newSize); + // Terminating zero means that there is a complete message. + return aDst.back() == 0; +} + +////////////////////////////////////////////////////////////////////////////////////////// +// +// Client receives a fixed number of messages and deletes itself. The interesting part is +// that those requests are sent by the server once per second, not all at once. +// + +class MyClient final + // The socket subscription has a number of callbacks notifying us about various TCP + // events like on-connect, on-recv, on-send, etc. All of them are optional, and are + // always executed in a single thread, never in parallel. This allows us not to use + // any mutexes for data used exclusively by those callbacks. + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort) + : myID(aID) + , myRecvCount(0) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + if (!ReadData(aStream, myInput)) + return; + MG_LOG_INFO("Client.OnRecv", "%d: got '%s'", myID, myInput.data()); + if (++myRecvCount == theMessageCount) + { + mySock->PostClose(); + return; + } + myInput.clear(); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + int myRecvCount; + mg::aio::TCPSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + , mySendDeadline(0) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + // Receive isn't planned, but lets schedule it to catch a potential socket + // closure. Socket close = read of zero bytes, so we need a pending read to notice + // it. Can be of any > 0 size. + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnEvent() final + { + if (mySentCount >= theMessageCount) + { + // Everything is sent. Time to shutdown. + mySock->PostClose(); + return; + } + if (mySendDeadline > mg::box::GetMilliseconds()) + { + // Not expired yet. A spurious wakeup, or the deadline is set first time. + // Anyway, lets wait until it expires. + mySock->SetDeadline(mySendDeadline); + return; + } + MG_LOG_INFO("Peer.OnEvent", "%d: sending next msg", myID); + MG_BOX_ASSERT(mySendSize == 0); + // Deadline is infinite so a next message is only sent after this one is fully + // flushed. Although in such a simple example it is probably sent completely in + // one go. + mySendDeadline = MG_TIME_INFINITE; + std::string msg = mg::box::StringFormat("message %d", ++mySentCount); + mySendSize = (uint32_t)msg.size() + 1; + // Copy the string, because it is on stack right now. + mySock->SendCopy(msg.c_str(), mySendSize); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream&) final + { + // The client isn't supposed to send anything. + MG_BOX_ASSERT(!"Unreachable"); + } + + // TCPSocketSubscription event. + void + OnSend( + uint32_t aByteCount) final + { + MG_BOX_ASSERT(aByteCount <= mySendSize); + mySendSize -= aByteCount; + if (mySendSize != 0) + return; + + // Finally all is sent. Send the next one in 1 second. + mySendDeadline = mg::box::GetMilliseconds() + 1000; + // OnEvent() is going to be executed again ASAP, but this is ok. It is simpler + // for us to implement all the logic in one function. It will handle this + // reschedule a spurious wakeup. + mySock->Reschedule(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; + int mySentCount; + uint32_t mySendSize; + uint64_t mySendDeadline; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/scheduler_01_simple_task/CMakeLists.txt b/examples/scheduler_01_simple_task/CMakeLists.txt new file mode 100644 index 00000000..f924b69e --- /dev/null +++ b/examples/scheduler_01_simple_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_01_simple_task main.cpp) +add_dependencies(scheduler_01_simple_task install_serverbox) + +target_include_directories(scheduler_01_simple_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_01_simple_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_01_simple_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_01_simple_task/main.cpp b/examples/scheduler_01_simple_task/main.cpp new file mode 100644 index 00000000..964601fe --- /dev/null +++ b/examples/scheduler_01_simple_task/main.cpp @@ -0,0 +1,22 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// The most trivial example of TaskScheduler usage. A single shot task which just prints +// something and gets deleted. +// + +int +main() +{ + mg::sch::TaskScheduler sched("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + sched.Post(new mg::sch::Task([&](mg::sch::Task *self) { + std::cout << "Executed in scheduler!\n"; + delete self; + })); + return 0; +} diff --git a/examples/scheduler_02_coroutine_task/CMakeLists.txt b/examples/scheduler_02_coroutine_task/CMakeLists.txt new file mode 100644 index 00000000..ad68726c --- /dev/null +++ b/examples/scheduler_02_coroutine_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_02_coroutine_task main.cpp) +add_dependencies(scheduler_02_coroutine_task install_serverbox) + +target_include_directories(scheduler_02_coroutine_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_02_coroutine_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_02_coroutine_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_02_coroutine_task/main.cpp b/examples/scheduler_02_coroutine_task/main.cpp new file mode 100644 index 00000000..5df2a508 --- /dev/null +++ b/examples/scheduler_02_coroutine_task/main.cpp @@ -0,0 +1,56 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A simple example of how to use C++ stack-less coroutines with TaskScheduler. +// + +int +main() +{ + mg::sch::Task task; + + // Do not use lambda capture-params! The thing is that C++ coroutines are not + // functions. They are **results** of functions. When a lambda returns a C++ + // corotuine, it needs to be invoked to return this coroutine. And after invocation + // the lambda is destroyed in this case. Which makes it not possible to use any lambda + // data, like captures. See for further explanation: + // https://stackoverflow.com/questions/60592174/lambda-lifetime-explanation-for-c20-coroutines + // + // Luckily, this problem is trivial to workaround. Just pass your values as arguments. + // Then the coroutine object captures them. + task.SetCallback([]( + mg::sch::Task& aSelf) -> mg::box::Coro { + + // Imagine the task sends a request to some async network client or alike. + std::cout << "Sending request ...\n"; + + // After sending, the task would wait for a response. Here it is simplified, so + // the task just yields and gets continued right away. + // Yield with no deadline or delay set simply re-schedules the task. + co_await aSelf.AsyncYield(); + + std::cout << "Received response!\n"; + co_await aSelf.AsyncYield(); + std::cout << "Finish\n"; + co_return; + }(task)); + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + scheduler.Post(&task); + return 0; +} diff --git a/examples/scheduler_03_multistep_task/CMakeLists.txt b/examples/scheduler_03_multistep_task/CMakeLists.txt new file mode 100644 index 00000000..8dc1a6d7 --- /dev/null +++ b/examples/scheduler_03_multistep_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_03_multistep_task main.cpp) +add_dependencies(scheduler_03_multistep_task install_serverbox) + +target_include_directories(scheduler_03_multistep_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_03_multistep_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_03_multistep_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_03_multistep_task/main.cpp b/examples/scheduler_03_multistep_task/main.cpp new file mode 100644 index 00000000..c0e6e80d --- /dev/null +++ b/examples/scheduler_03_multistep_task/main.cpp @@ -0,0 +1,88 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// An example how a task can consist of multiple untrivial steps, with yields in between, +// and with context used by those steps. +// + +class MyTask + : public mg::sch::Task +{ +public: + MyTask() + : Task([this](mg::sch::Task* aSelf) { + TaskSendRequest(aSelf); + }) {} + +private: + // Step 1. + void + TaskSendRequest( + mg::sch::Task* aSelf) + { + // The scheduler always gives the "self" as an argument. This is not needed when + // the task is inherited. But quite handy when a task is just a lambda. + MG_BOX_ASSERT(aSelf == this); + + // Imagine that the task sends an HTTP request via some other module, and is woken + // up, when the request ends. To execute the next step. For that the task changes + // its callback which will be executed when the task is woken up next time. + std::cout << "Sending request ...\n"; + aSelf->SetCallback([this](mg::sch::Task* aSelf) { + TaskRecvResponse(aSelf); + }); + mg::sch::TaskScheduler::This().Post(aSelf); + } + + // Step 2. + void + TaskRecvResponse( + mg::sch::Task* aSelf) + { + MG_BOX_ASSERT(aSelf == this); + + // Lets make another step, with a third callback which would be the final one. + std::cout << "Received response!\n"; + aSelf->SetCallback([this](mg::sch::Task *aSelf) { + TaskFinish(aSelf); + }); + mg::sch::TaskScheduler::This().Post(aSelf); + } + + // Step 3. + void + TaskFinish( + mg::sch::Task* aSelf) + { + MG_BOX_ASSERT(aSelf == this); + std::cout << "Finish\n"; + } + + // Here one would normally put various members needed by that task for its context. + // For example, request and user data related to this task. +}; + +int +main() +{ + MyTask task; + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + scheduler.Post(&task); + return 0; +} diff --git a/examples/scheduler_04_interacting_tasks/CMakeLists.txt b/examples/scheduler_04_interacting_tasks/CMakeLists.txt new file mode 100644 index 00000000..a973b1bd --- /dev/null +++ b/examples/scheduler_04_interacting_tasks/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_04_interacting_tasks main.cpp) +add_dependencies(scheduler_04_interacting_tasks install_serverbox) + +target_include_directories(scheduler_04_interacting_tasks PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_04_interacting_tasks PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_04_interacting_tasks + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_04_interacting_tasks/main.cpp b/examples/scheduler_04_interacting_tasks/main.cpp new file mode 100644 index 00000000..ab48ec57 --- /dev/null +++ b/examples/scheduler_04_interacting_tasks/main.cpp @@ -0,0 +1,92 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A realistic example how one task might submit some async work to be executed by another +// task, which in turn would wake the first one up, when the work is done. +// + +static void +TaskSubmitRequest( + mg::sch::Task& aSender) +{ + // A sub-task is created. But normally for such need one would have a pre-running task + // or a thread or some other sort of async executor, which takes requests and executes + // them. + mg::sch::Task* worker = new mg::sch::Task(); + worker->SetCallback([]( + mg::sch::Task& aSelf, + mg::sch::Task& aSender) -> mg::box::Coro { + + std::cout << "Worker: sent request\n"; + // Lets simulate like if the request takes 1 second to get done. For that the + // task would yield for 1 second and then get executed again. + uint64_t t1 = mg::box::GetMilliseconds(); + aSelf.SetDelay(1000); + co_await aSelf.AsyncYield(); + + // This flag means the task was woken up because its deadline was due. + // Technically, it could wake up spuriously, but here we know it can't happen. + // Proper production code should still be ready to that though. + MG_BOX_ASSERT(aSelf.IsExpired()); + + uint64_t t2 = mg::box::GetMilliseconds(); + std::cout << "Worker: received response, took " << t2 - t1 << " msec\n"; + + // Wakeup the original task + let it know the request is actually done. The + // original task would be able to tell that by seeing that it's got a signal, not + // just a spurious wakeup. + aSender.PostSignal(); + + // 'delete self' + co_return wouldn't work here. Because deletion of the self + // would destroy the C++ coroutine object. co_return would then fail with + // use-after-free. For such 'delete and exit' case there is the special helper. + co_await aSelf.AsyncExitDelete(); + }(*worker, aSender)); + mg::sch::TaskScheduler::This().Post(worker); +} + +int +main() +{ + mg::sch::Task task; + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + + task.SetCallback([]( + mg::sch::Task& aSelf) -> mg::box::Coro { + + // The task wants something to be done asynchronously. It would then submit the + // work (could be another task, could be an HTTP client, or something alike) and + // wait for a notification when it is done. + std::cout << "Main: submit request\n"; + TaskSubmitRequest(aSelf); + + // The waiting is in a loop to protect the code from spurious wakeups. + do { + // Make sure to signalize that the task wants to wait (infinitely) until a + // signal comes. Without this the task would be just re-scheduled immediately + // and it would be a busy-loop. + aSelf.SetWait(); + } while (!co_await aSelf.AsyncReceiveSignal()); + + std::cout << "Main: finish\n"; + co_return; + }(task)); + scheduler.Post(&task); + return 0; +} diff --git a/src/mg/stub/BoxStub.cpp b/src/mg/stub/BoxStub.cpp index c82e9e4f..5da020d5 100644 --- a/src/mg/stub/BoxStub.cpp +++ b/src/mg/stub/BoxStub.cpp @@ -2,6 +2,7 @@ #include "mg/box/Log.h" #include +#include // // The file implements missing functions from mg::box in some trivial way to get the tests @@ -42,6 +43,8 @@ namespace box { const char* aFormat, va_list aParams) { + static std::mutex mutex; + std::unique_lock lock(mutex); printf("%s (%s): ", LogLevelToStringWithPadding(aLevel), aTag); vprintf(aFormat, aParams); printf("\n"); From 3a47cb6ea4c90ca86336d2bd1e20b1ef4721e741 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Thu, 23 Jan 2025 20:42:42 +0100 Subject: [PATCH 7/7] ci: introduce examples testing Lets make sure they compile and run on all supported platforms the same way as users would use Serverbox. With proper installation steps and all that. Closes #20 --- .github/workflows/test.yml | 104 ++++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dfe6d622..df1887fc 100755 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,9 +7,12 @@ on: branches: [ "master" ] jobs: +########################################################################################## +## Tests +########################################################################################## test: + name: Tests runs-on: ${{ matrix.os }} - strategy: fail-fast: true matrix: @@ -116,3 +119,102 @@ jobs: working-directory: ${{ steps.strings.outputs.test-dir }} run: ./${{ steps.strings.outputs.test-exe }} timeout-minutes: 30 + +########################################################################################## +## Examples +########################################################################################## + + examples: + name: Examples + needs: test + runs-on: ${{ matrix.os }} + strategy: + fail-fast: true + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + build_type: [Release] + copiler_suite: [msvc, llvm, gnu] + exclude: + - os: windows-latest + copiler_suite: gnu + - os: windows-latest + copiler_suite: llvm + + - os: ubuntu-latest + copiler_suite: msvc + + - os: macos-latest + copiler_suite: msvc + - os: macos-latest + copiler_suite: gnu + + steps: + - uses: actions/checkout@v3 + + - name: Set strings + id: strings + shell: bash + run: | + if [ "$RUNNER_OS" == "Windows" ]; then + echo "exe-subdir=${{ matrix.build_type }}/" >> "$GITHUB_OUTPUT" + echo "exe-suffix=.exe" >> "$GITHUB_OUTPUT" + fi + + if [ "${{ matrix.copiler_suite }}" == "msvc" ]; then + echo "compiler_c=cl" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=cl" >> "$GITHUB_OUTPUT" + elif [ "${{ matrix.copiler_suite }}" == "llvm" ]; then + echo "compiler_c=clang" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=clang++" >> "$GITHUB_OUTPUT" + else + echo "compiler_c=gcc" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=g++" >> "$GITHUB_OUTPUT" + fi + + - name: Configure CMake + working-directory: ${{ github.workspace }}/examples + shell: bash + run: | + export CC="${{ steps.strings.outputs.compiler_c }}" + export CXX="${{ steps.strings.outputs.compiler_cxx }}" + if [ "$RUNNER_OS" == "macOS" ]; then + export CC=$(brew --prefix llvm@15)/bin/$CC + export CXX=$(brew --prefix llvm@15)/bin/$CXX + fi + cmake -B "${{ github.workspace }}/examples/build" \ + -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ + -S "${{ github.workspace }}/examples" + + - name: Build + shell: bash + run: | + cmake --build "${{ github.workspace }}/examples/build" \ + --config ${{ matrix.build_type }} + + - name: Run + working-directory: ${{ github.workspace }}/examples/build + timeout-minutes: 5 + shell: bash + run: | + invoke_exe() { + local arg="$1" + if [ "$RUNNER_OS" = "Windows" ]; then + local exe_path="./$arg/${{ matrix.build_type }}/$arg.exe" + else + local exe_path="./$arg/$arg" + fi + if [ -x "$exe_path" ]; then + "$exe_path" + else + echo "Error: Executable not found or not executable at $exe_path" + return 1 + fi + } + invoke_exe scheduler_01_simple_task + invoke_exe scheduler_02_coroutine_task + invoke_exe scheduler_03_multistep_task + invoke_exe scheduler_04_interacting_tasks + invoke_exe iocore_01_tcp_hello + invoke_exe iocore_02_ssl_hello + invoke_exe iocore_03_pipeline + invoke_exe iocore_04_tcp_periodic