diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4e772bcd..df1887fc 100755 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,9 +7,12 @@ on: branches: [ "master" ] jobs: - build: +########################################################################################## +## Tests +########################################################################################## + test: + name: Tests runs-on: ${{ matrix.os }} - strategy: fail-fast: true matrix: @@ -116,3 +119,102 @@ jobs: working-directory: ${{ steps.strings.outputs.test-dir }} run: ./${{ steps.strings.outputs.test-exe }} timeout-minutes: 30 + +########################################################################################## +## Examples +########################################################################################## + + examples: + name: Examples + needs: test + runs-on: ${{ matrix.os }} + strategy: + fail-fast: true + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + build_type: [Release] + copiler_suite: [msvc, llvm, gnu] + exclude: + - os: windows-latest + copiler_suite: gnu + - os: windows-latest + copiler_suite: llvm + + - os: ubuntu-latest + copiler_suite: msvc + + - os: macos-latest + copiler_suite: msvc + - os: macos-latest + copiler_suite: gnu + + steps: + - uses: actions/checkout@v3 + + - name: Set strings + id: strings + shell: bash + run: | + if [ "$RUNNER_OS" == "Windows" ]; then + echo "exe-subdir=${{ matrix.build_type }}/" >> "$GITHUB_OUTPUT" + echo "exe-suffix=.exe" >> "$GITHUB_OUTPUT" + fi + + if [ "${{ matrix.copiler_suite }}" == "msvc" ]; then + echo "compiler_c=cl" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=cl" >> "$GITHUB_OUTPUT" + elif [ "${{ matrix.copiler_suite }}" == "llvm" ]; then + echo "compiler_c=clang" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=clang++" >> "$GITHUB_OUTPUT" + else + echo "compiler_c=gcc" >> "$GITHUB_OUTPUT" + echo "compiler_cxx=g++" >> "$GITHUB_OUTPUT" + fi + + - name: Configure CMake + working-directory: ${{ github.workspace }}/examples + shell: bash + run: | + export CC="${{ steps.strings.outputs.compiler_c }}" + export CXX="${{ steps.strings.outputs.compiler_cxx }}" + if [ "$RUNNER_OS" == "macOS" ]; then + export CC=$(brew --prefix llvm@15)/bin/$CC + export CXX=$(brew --prefix llvm@15)/bin/$CXX + fi + cmake -B "${{ github.workspace }}/examples/build" \ + -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ + -S "${{ github.workspace }}/examples" + + - name: Build + shell: bash + run: | + cmake --build "${{ github.workspace }}/examples/build" \ + --config ${{ matrix.build_type }} + + - name: Run + working-directory: ${{ github.workspace }}/examples/build + timeout-minutes: 5 + shell: bash + run: | + invoke_exe() { + local arg="$1" + if [ "$RUNNER_OS" = "Windows" ]; then + local exe_path="./$arg/${{ matrix.build_type }}/$arg.exe" + else + local exe_path="./$arg/$arg" + fi + if [ -x "$exe_path" ]; then + "$exe_path" + else + echo "Error: Executable not found or not executable at $exe_path" + return 1 + fi + } + invoke_exe scheduler_01_simple_task + invoke_exe scheduler_02_coroutine_task + invoke_exe scheduler_03_multistep_task + invoke_exe scheduler_04_interacting_tasks + invoke_exe iocore_01_tcp_hello + invoke_exe iocore_02_ssl_hello + invoke_exe iocore_03_pipeline + invoke_exe iocore_04_tcp_periodic diff --git a/CMakeLists.txt b/CMakeLists.txt index c53a63c4..1de71971 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,6 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") -Wall -Wextra -Wpedantic -Werror -Wno-unknown-warning-option -Wunused-function -Wno-invalid-offsetof -Wno-unused-value -Wno-deprecated-copy -Wno-gnu-zero-variadic-macro-arguments - -fno-rtti ) set(CMAKE_C_FLAGS ${CMAKE_C_FLAGS} "-pthread") @@ -42,5 +41,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") endif () add_subdirectory(src) -add_subdirectory(test) -add_subdirectory(bench) +if (NOT MG_SKIP_TEST) + add_subdirectory(test) +endif() +if (NOT MG_SKIP_BENCHES) + add_subdirectory(bench) +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 00000000..b203f3f0 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,70 @@ +cmake_minimum_required (VERSION 3.8) + +project("Examples") + +if (NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Release) +endif() +message(STATUS "Build type ${CMAKE_BUILD_TYPE}") + +if (NOT DEFINED CMAKE_CXX_STANDARD) + message(STATUS "Using C++20 standard as default") + set(CMAKE_CXX_STANDARD 20) +else() + message(STATUS "Using C++${CMAKE_CXX_STANDARD} standard as explicitly requested") +endif() + +if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + add_compile_options( + -Wall -Wextra -Wpedantic -Werror -Wno-unknown-warning-option -Wunused-function + -Wno-invalid-offsetof -Wno-unused-value -Wno-deprecated-copy + -Wno-gnu-zero-variadic-macro-arguments + ) + + set(CMAKE_C_FLAGS ${CMAKE_C_FLAGS} "-pthread") + set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-pthread") +elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + add_compile_options( + # This is needed at least for correct macro handling. Default behaviour won't + # expand __VA_ARGS__ correctly. + /Zc:preprocessor + /WX /wd4266 /wd4324 /wd4355 /wd4365 /wd4458 /wd4514 /wd4548 /wd4625 /wd4626 + /wd4668 /wd4710 /wd4820 /wd5026 /wd5027 /wd5039 /wd5045 /wd5105 /wd5219 /wd26439 + /wd26800 + # It ignores 'break' and 'fallthrough' done via a macro which makes it annoying + # and pointless. + /wd5262 + # Info message about a function being inlined. + /wd4711 + ) +endif() + +set(MG_SERVERBOX_BUILD_DIR ${CMAKE_BINARY_DIR}/build_serverbox) +set(MG_SERVERBOX_DIR ${MG_SERVERBOX_BUILD_DIR}/installed) + +add_custom_target(install_serverbox + COMMAND ${CMAKE_COMMAND} + -S ${CMAKE_SOURCE_DIR}/.. + -B ${MG_SERVERBOX_BUILD_DIR} + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DMG_SKIP_BENCHES=1 + -DMG_SKIP_TEST=1 + -DCMAKE_INSTALL_PREFIX=${MG_SERVERBOX_DIR} + + COMMAND ${CMAKE_COMMAND} + --build ${MG_SERVERBOX_BUILD_DIR} --config ${CMAKE_BUILD_TYPE} -j + + COMMAND ${CMAKE_COMMAND} + --install ${MG_SERVERBOX_BUILD_DIR} --config ${CMAKE_BUILD_TYPE} + + COMMENT "Installing serverbox" +) + +add_subdirectory(iocore_01_tcp_hello) +add_subdirectory(iocore_02_ssl_hello) +add_subdirectory(iocore_03_pipeline) +add_subdirectory(iocore_04_tcp_periodic) +add_subdirectory(scheduler_01_simple_task) +add_subdirectory(scheduler_02_coroutine_task) +add_subdirectory(scheduler_03_multistep_task) +add_subdirectory(scheduler_04_interacting_tasks) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..780b26df --- /dev/null +++ b/examples/README.md @@ -0,0 +1,25 @@ +# Examples + +The folder provides series of examples how to use the most interesting parts of Serverbox. + +Each example can be built and run on its own, they only depend on Serverbox itself, and not on each other. + +They can be used as reference points when you want to try and build something of your own. + +The recommended way of reading them is firstly all `scheduler_*`, then all `iocore_*`. The sequence 01, 02, etc is not required, each example is self-sufficient build- and code-wise. But later examples might not explain some simple things already covered in the previous examples. + +## Running + +The recommended way to use them is this (on non-Windows): +```Bash +# Build them all. +mkdir -p build +cd build +cmake .. +make -j + +# Run any of them like this: +./scheduler_01_simple_task/scheduler_01_simple_task +``` + +On Windows platform they also compile and run, but one would have to run them via cmd/PowerShell/VisualStudio. diff --git a/examples/iocore_01_tcp_hello/CMakeLists.txt b/examples/iocore_01_tcp_hello/CMakeLists.txt new file mode 100644 index 00000000..6847e903 --- /dev/null +++ b/examples/iocore_01_tcp_hello/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(iocore_01_tcp_hello main.cpp) +add_dependencies(iocore_01_tcp_hello install_serverbox) + +target_include_directories(iocore_01_tcp_hello PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_01_tcp_hello PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_01_tcp_hello + ${libs} +) diff --git a/examples/iocore_01_tcp_hello/main.cpp b/examples/iocore_01_tcp_hello/main.cpp new file mode 100644 index 00000000..06eaeb18 --- /dev/null +++ b/examples/iocore_01_tcp_hello/main.cpp @@ -0,0 +1,267 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" + +#include + +// +// Example of how to use TCPSocket and TCPServer. The clients connect to the server, send +// a message, wait for any response as a receipt confirmation, and delete themselves. +// + +static const int theClientCount = 5; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyClient final + // The socket subscription has a number of callbacks notifying us about various TCP + // events like on-connect, on-recv, on-send, etc. All of them are optional, and are + // always executed in a single thread, never in parallel. This allows us not to use + // any mutexes for data used exclusively by those callbacks. + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + // Using SendRef(), because the string is in the constant data section. It is + // never deleted. Hence no need to copy it. + const char* msg = "hello handshake"; + mySock->SendRef(msg, mg::box::Strlen(msg) + 1); + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + MG_LOG_INFO("Client.OnRecv", "%d: got response", myID); + MG_BOX_ASSERT(aStream.GetReadSize() > 0); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = myInput.size(); + myInput.resize(oldSize + newSize); + aStream.ReadData(myInput.data() + oldSize, newSize); + // Zero character (string terminator) is the end of the message. Until it is + // received, the message is incomplete. + if (myInput.back() != 0) + { + mySock->Recv(128); + return; + } + MG_LOG_INFO("Peer.OnRecv", "%d: got '%s'", myID, myInput.data()); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + // Identical to TCPSocketSubscription, but provides TCP-server events, like on-accept. + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_02_ssl_hello/CMakeLists.txt b/examples/iocore_02_ssl_hello/CMakeLists.txt new file mode 100644 index 00000000..fede3a34 --- /dev/null +++ b/examples/iocore_02_ssl_hello/CMakeLists.txt @@ -0,0 +1,26 @@ +add_executable(iocore_02_ssl_hello main.cpp Certs.cpp) +add_dependencies(iocore_02_ssl_hello install_serverbox) + +find_package(OpenSSL REQUIRED) + +target_include_directories(iocore_02_ssl_hello PUBLIC + ${MG_SERVERBOX_DIR}/include + ${OPENSSL_INCLUDE_DIR} +) +target_link_directories(iocore_02_ssl_hello PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub + ${OPENSSL_SSL_LIBRARY} + ${OPENSSL_CRYPTO_LIBRARY} +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_02_ssl_hello + ${libs} +) diff --git a/examples/iocore_02_ssl_hello/Certs.cpp b/examples/iocore_02_ssl_hello/Certs.cpp new file mode 100644 index 00000000..51000efb --- /dev/null +++ b/examples/iocore_02_ssl_hello/Certs.cpp @@ -0,0 +1,146 @@ +#include "Certs.h" + +// These certificates are signed with a CA, not by themselves. +// +// Generation commands: +// +// 1) Generate local key of CA. +// +// openssl ecparam -out ca.key -name prime256v1 -genkey +// +// 2) Generate CA public cert. Be sure you specify a unique 'Common Name' field when +// prompted by the command line. +// +// openssl req -x509 -new -nodes -key ca.key -sha256 -days 36500 -out ca.crt +// +// 3) Generate server's own private key: +// +// openssl ecparam -out dom.key -name prime256v1 -genkey +// +// 4) Generate CSRs (request to create a cert for the server). Here 3 are generated to +// give our server 3 public certificates. It is useful for testing their switch. Be +// sure you specify a unique 'Common Name' field for each CSR when prompted by the +// command line. +// +// openssl req -new -key dom.key -out dom.csr +// +// 5) Generate server's public certificate by the CA key and the CSR. +// +// openssl x509 -req -in dom.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out dom.crt -days 36500 -sha256 +// +// 6) Create DER versions of these files: +// +// openssl x509 -inform pem -in dom.crt -outform der -out dom.crt.der1 +// openssl x509 -inform pem -in ca.crt -outform der -out ca.crt.der +// openssl ec -inform pem -in dom.key -outform der -out dom.key.der +// +// 7) Print ready-to-use arrays of bytes. +// +// xxd -i dom.crt.der +// xxd -i dom.key.der +// xxd -i ca.crt.der +// +// Note that here DER is only used as a binary representation. PEM can be used too. +// Serverbox API allows both formats. +// +const uint32_t theTestCertSize = 429; +const uint8_t theTestCert[] = { + 0x30, 0x82, 0x01, 0xa9, 0x30, 0x82, 0x01, 0x4f, 0x02, 0x14, 0x53, 0x53, + 0x89, 0x2d, 0x22, 0x6f, 0xaf, 0xe4, 0x1c, 0x1a, 0x00, 0xb4, 0x08, 0x6c, + 0xf2, 0x24, 0x58, 0xbf, 0x2e, 0x93, 0x30, 0x0a, 0x06, 0x08, 0x2a, 0x86, + 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x30, 0x53, 0x31, 0x0b, 0x30, 0x09, + 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x52, 0x55, 0x31, 0x0f, 0x30, + 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, + 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x07, 0x0c, + 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x10, 0x30, 0x0e, 0x06, + 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, + 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x03, 0x0c, 0x07, + 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x30, 0x20, 0x17, 0x0d, 0x32, + 0x32, 0x30, 0x36, 0x32, 0x32, 0x31, 0x34, 0x30, 0x33, 0x30, 0x35, 0x5a, + 0x18, 0x0f, 0x32, 0x31, 0x32, 0x32, 0x30, 0x35, 0x32, 0x39, 0x31, 0x34, + 0x30, 0x33, 0x30, 0x35, 0x5a, 0x30, 0x59, 0x31, 0x0b, 0x30, 0x09, 0x06, + 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x41, 0x55, 0x31, 0x13, 0x30, 0x11, + 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x0a, 0x53, 0x6f, 0x6d, 0x65, 0x2d, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x31, 0x21, 0x30, 0x1f, 0x06, 0x03, 0x55, + 0x04, 0x0a, 0x0c, 0x18, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x20, 0x57, 0x69, 0x64, 0x67, 0x69, 0x74, 0x73, 0x20, 0x50, 0x74, 0x79, + 0x20, 0x4c, 0x74, 0x64, 0x31, 0x12, 0x30, 0x10, 0x06, 0x03, 0x55, 0x04, + 0x03, 0x0c, 0x09, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x20, 0x31, + 0x30, 0x59, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, + 0x01, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07, 0x03, + 0x42, 0x00, 0x04, 0xde, 0x5a, 0x0e, 0x8e, 0x07, 0xdd, 0xe2, 0x85, 0xe5, + 0x04, 0x73, 0xfb, 0x74, 0x77, 0x83, 0xc6, 0xae, 0x9f, 0xf0, 0xa1, 0xf0, + 0xe6, 0xe2, 0xf1, 0xf8, 0x8e, 0x45, 0x19, 0xe5, 0x58, 0x78, 0x90, 0xf5, + 0x16, 0xeb, 0x99, 0x33, 0xe7, 0xff, 0xdf, 0x4e, 0x0f, 0xb7, 0x7b, 0xf8, + 0x25, 0x88, 0x79, 0xa5, 0xc9, 0xe5, 0x82, 0x83, 0x9d, 0xa3, 0x37, 0x56, + 0x59, 0x56, 0xbf, 0xd3, 0x41, 0x2a, 0xe8, 0x30, 0x0a, 0x06, 0x08, 0x2a, + 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x03, 0x48, 0x00, 0x30, 0x45, + 0x02, 0x20, 0x0e, 0xae, 0x29, 0x05, 0x03, 0x80, 0x07, 0x8f, 0x2c, 0x84, + 0x7f, 0x1c, 0xeb, 0xab, 0x5c, 0xd0, 0x74, 0x62, 0xb5, 0x87, 0x57, 0x24, + 0xcc, 0x7b, 0x25, 0xa0, 0x77, 0x09, 0xb9, 0x2f, 0x77, 0x13, 0x02, 0x21, + 0x00, 0xe1, 0x46, 0x88, 0x24, 0xcc, 0x79, 0xed, 0x5a, 0x18, 0x32, 0xac, + 0xe4, 0x55, 0x28, 0xc4, 0x94, 0xc9, 0xd4, 0xa0, 0xcb, 0xb3, 0x01, 0xfa, + 0x18, 0xe0, 0x69, 0xe6, 0x2c, 0x23, 0xbb, 0x10, 0xf4 +}; + +const uint32_t theTestKeySize = 121; +const uint8_t theTestKey[] = { + 0x30, 0x77, 0x02, 0x01, 0x01, 0x04, 0x20, 0x65, 0x01, 0x04, 0xcc, 0x6b, + 0x27, 0xb4, 0x85, 0x3d, 0x38, 0x78, 0xd7, 0x3a, 0x5a, 0x12, 0x16, 0xdd, + 0xe4, 0x8f, 0xa5, 0x82, 0x67, 0xba, 0x34, 0xf0, 0x32, 0x6f, 0xd4, 0x74, + 0x89, 0x83, 0x44, 0xa0, 0x0a, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, + 0x03, 0x01, 0x07, 0xa1, 0x44, 0x03, 0x42, 0x00, 0x04, 0xde, 0x5a, 0x0e, + 0x8e, 0x07, 0xdd, 0xe2, 0x85, 0xe5, 0x04, 0x73, 0xfb, 0x74, 0x77, 0x83, + 0xc6, 0xae, 0x9f, 0xf0, 0xa1, 0xf0, 0xe6, 0xe2, 0xf1, 0xf8, 0x8e, 0x45, + 0x19, 0xe5, 0x58, 0x78, 0x90, 0xf5, 0x16, 0xeb, 0x99, 0x33, 0xe7, 0xff, + 0xdf, 0x4e, 0x0f, 0xb7, 0x7b, 0xf8, 0x25, 0x88, 0x79, 0xa5, 0xc9, 0xe5, + 0x82, 0x83, 0x9d, 0xa3, 0x37, 0x56, 0x59, 0x56, 0xbf, 0xd3, 0x41, 0x2a, + 0xe8 +}; + +const uint32_t theTestCACertSize = 513; +const uint8_t theTestCACert[] = { + 0x30, 0x82, 0x01, 0xfd, 0x30, 0x82, 0x01, 0xa3, 0xa0, 0x03, 0x02, 0x01, + 0x02, 0x02, 0x14, 0x33, 0x0d, 0xc7, 0xa8, 0xff, 0xc4, 0x48, 0x65, 0x67, + 0x97, 0x6f, 0x95, 0x98, 0xed, 0xe0, 0xa6, 0xf9, 0x99, 0x21, 0x24, 0x30, + 0x0a, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x30, + 0x53, 0x31, 0x0b, 0x30, 0x09, 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, + 0x52, 0x55, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, + 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, + 0x03, 0x55, 0x04, 0x07, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, + 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, + 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, + 0x55, 0x04, 0x03, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, + 0x30, 0x20, 0x17, 0x0d, 0x32, 0x32, 0x30, 0x36, 0x32, 0x32, 0x31, 0x33, + 0x33, 0x39, 0x33, 0x33, 0x5a, 0x18, 0x0f, 0x32, 0x31, 0x32, 0x32, 0x30, + 0x35, 0x32, 0x39, 0x31, 0x33, 0x33, 0x39, 0x33, 0x33, 0x5a, 0x30, 0x53, + 0x31, 0x0b, 0x30, 0x09, 0x06, 0x03, 0x55, 0x04, 0x06, 0x13, 0x02, 0x52, + 0x55, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, 0x55, 0x04, 0x08, 0x0c, 0x06, + 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, 0x0f, 0x30, 0x0d, 0x06, 0x03, + 0x55, 0x04, 0x07, 0x0c, 0x06, 0x4d, 0x6f, 0x73, 0x63, 0x6f, 0x77, 0x31, + 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, 0x04, 0x0a, 0x0c, 0x07, 0x55, 0x62, + 0x69, 0x73, 0x6f, 0x66, 0x74, 0x31, 0x10, 0x30, 0x0e, 0x06, 0x03, 0x55, + 0x04, 0x03, 0x0c, 0x07, 0x55, 0x62, 0x69, 0x73, 0x6f, 0x66, 0x74, 0x30, + 0x59, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, 0x01, + 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07, 0x03, 0x42, + 0x00, 0x04, 0x0b, 0x3d, 0x49, 0xbd, 0x61, 0xd9, 0xe9, 0xd6, 0xbd, 0x49, + 0xbb, 0xb5, 0x2c, 0xb7, 0x7e, 0x0a, 0x05, 0xb1, 0xfe, 0x89, 0xb3, 0x1a, + 0x0d, 0xbf, 0xf4, 0x44, 0x8e, 0x48, 0x6a, 0xa5, 0xa5, 0x26, 0xe5, 0x0b, + 0x10, 0xcd, 0x2c, 0x29, 0xde, 0xd8, 0xc6, 0x4b, 0x89, 0xdf, 0x6c, 0xab, + 0x8f, 0xb7, 0x87, 0xc4, 0xf8, 0xba, 0x31, 0x7c, 0xbb, 0x96, 0x42, 0x8c, + 0xf3, 0xb9, 0x62, 0x1c, 0xd4, 0x21, 0xa3, 0x53, 0x30, 0x51, 0x30, 0x1d, + 0x06, 0x03, 0x55, 0x1d, 0x0e, 0x04, 0x16, 0x04, 0x14, 0x64, 0xe3, 0x03, + 0x4e, 0x60, 0xc2, 0xd6, 0x3b, 0xff, 0x8f, 0x01, 0x0e, 0xc3, 0xe8, 0xeb, + 0xc7, 0xba, 0x88, 0x56, 0x66, 0x30, 0x1f, 0x06, 0x03, 0x55, 0x1d, 0x23, + 0x04, 0x18, 0x30, 0x16, 0x80, 0x14, 0x64, 0xe3, 0x03, 0x4e, 0x60, 0xc2, + 0xd6, 0x3b, 0xff, 0x8f, 0x01, 0x0e, 0xc3, 0xe8, 0xeb, 0xc7, 0xba, 0x88, + 0x56, 0x66, 0x30, 0x0f, 0x06, 0x03, 0x55, 0x1d, 0x13, 0x01, 0x01, 0xff, + 0x04, 0x05, 0x30, 0x03, 0x01, 0x01, 0xff, 0x30, 0x0a, 0x06, 0x08, 0x2a, + 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02, 0x03, 0x48, 0x00, 0x30, 0x45, + 0x02, 0x20, 0x69, 0xd0, 0x77, 0x50, 0x50, 0x6a, 0x3c, 0x49, 0x5b, 0x7e, + 0x09, 0x6c, 0x40, 0xf3, 0xa3, 0xdf, 0x27, 0x08, 0xbc, 0xa6, 0x2c, 0x14, + 0xa8, 0xca, 0xbb, 0x14, 0x78, 0xb7, 0xe6, 0x2d, 0x73, 0x33, 0x02, 0x21, + 0x00, 0x97, 0x40, 0x67, 0x7c, 0xd3, 0x76, 0x30, 0x3f, 0xa2, 0x05, 0xb7, + 0x7e, 0x72, 0xed, 0xa8, 0x9b, 0x8b, 0x00, 0x32, 0x61, 0x2f, 0xaf, 0x7b, + 0x6c, 0x5b, 0xa7, 0x17, 0x90, 0x69, 0xa9, 0x2a, 0x5c +}; diff --git a/examples/iocore_02_ssl_hello/Certs.h b/examples/iocore_02_ssl_hello/Certs.h new file mode 100644 index 00000000..db11bdd4 --- /dev/null +++ b/examples/iocore_02_ssl_hello/Certs.h @@ -0,0 +1,13 @@ +#include "mg/box/Definitions.h" + +// Public certificate to use on server. +extern const uint32_t theTestCertSize; +extern const uint8_t theTestCert[]; + +// Private key to use on server. +extern const uint32_t theTestKeySize; +extern const uint8_t theTestKey[]; + +// Public certificate used to sign the server. Should be available to the clients. +extern const uint32_t theTestCACertSize; +extern const uint8_t theTestCACert[]; diff --git a/examples/iocore_02_ssl_hello/main.cpp b/examples/iocore_02_ssl_hello/main.cpp new file mode 100644 index 00000000..a774dcb9 --- /dev/null +++ b/examples/iocore_02_ssl_hello/main.cpp @@ -0,0 +1,296 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/SSLSocket.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" +#include "mg/net/SSLContext.h" + +#include "Certs.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A simple example how to use SSL in Serverbox. Here a few SSL clients connect to an SSL +// server, send a simple message, gets its receipt confirmation, and close themselves. +// +// The server here has a properly signed certificate, and the clients validate it. +// + +static const int theClientCount = 5; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// The client sends a handshake message, and closes itself when any response is received. +// +class MyClient final + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort, + mg::net::SSLContext* aSSL) + : myID(aID) + , mySock(new mg::aio::SSLSocket(aCore)) + { + mg::aio::SSLSocketParams sslParams; + sslParams.mySSL = aSSL; + mySock->Open(sslParams); + + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + const char* msg = "hello handshake"; + // Using SendRef(), because the string is in the constant data section. It is + // never deleted. Hence no need to copy it. + mySock->SendRef(msg, mg::box::Strlen(msg) + 1); + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + MG_LOG_INFO("Client.OnRecv", "%d: got response", myID); + MG_BOX_ASSERT(aStream.GetReadSize() > 0); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::SSLSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock, + mg::net::SSLContext* aSSL) + : myID(aID) + , mySock(new mg::aio::SSLSocket(aCore)) + { + mg::aio::SSLSocketParams sslParams; + sslParams.mySSL = aSSL; + mySock->Open(sslParams); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = myInput.size(); + myInput.resize(oldSize + newSize); + aStream.ReadData(myInput.data() + oldSize, newSize); + // Zero character (string terminator) is the end of the message. Until it is + // received, the message is incomplete. + if (myInput.back() != 0) + { + mySock->Recv(128); + return; + } + MG_LOG_INFO("Peer.OnRecv", "%d: got '%s'", myID, myInput.data()); + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::SSLSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + , mySSL(mg::net::SSLContext::NewShared(true /* is server */)) + { + // This would normally be AddLocalCertFile(), because in most cases the SSL data + // is stored on disk. + bool ok = mySSL->AddLocalCert( + theTestCert, theTestCertSize, + theTestKey, theTestKeySize); + MG_BOX_ASSERT(ok); + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock, mySSL.GetPointer()); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; + mg::net::SSLContext::Ptr mySSL; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + mg::net::SSLContext::Ptr clientSSL = + mg::net::SSLContext::NewShared(false /* is server */); + // Usually SSL cert is a file. Then you can use AddRemoteCertFile() if it is something + // custom, or AddRemoteCertFromSystem() when need to talk to the open Internet and the + // system's certificates are enough. + clientSSL->AddRemoteCert(theTestCACert, theTestCACertSize); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port, clientSSL.GetPointer()); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_03_pipeline/CMakeLists.txt b/examples/iocore_03_pipeline/CMakeLists.txt new file mode 100644 index 00000000..6f876584 --- /dev/null +++ b/examples/iocore_03_pipeline/CMakeLists.txt @@ -0,0 +1,22 @@ +add_executable(iocore_03_pipeline main.cpp) +add_dependencies(iocore_03_pipeline install_serverbox) + +target_include_directories(iocore_03_pipeline PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_03_pipeline PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgsch + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_03_pipeline + ${libs} +) diff --git a/examples/iocore_03_pipeline/main.cpp b/examples/iocore_03_pipeline/main.cpp new file mode 100644 index 00000000..8e26beb9 --- /dev/null +++ b/examples/iocore_03_pipeline/main.cpp @@ -0,0 +1,460 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A very real example with a task as the main business logic request, and a networking +// client to do sub-requests of that task. +// +// In this example the business logic is math. Request comes to the app and wants some +// arith operations done. But the main request processor can't do that. Instead, it sends +// those math operations as sub-requests to a "remote backend" (our CalcServer in this +// case). +// +// A more realistic example: imagine a server processing save games of an online game. The +// server is stateless, doesn't store any games itself. Instead, for any request like +// "create a save game", "drop it", "update it" the app downloads the save game from a +// database, does the work, and uploads the result. It would involve 2 sub-requests to the +// database for one main request like "update a save game". +// + +////////////////////////////////////////////////////////////////////////////////////////// +// +// CalcClient asynchronously talks to a "remote" CalcServer in request-response manner. +// Each request on completion fires its completion-callback, where we can wake a task up, +// send it a signal, or do anything else. +// +struct CalcRequest +{ + char myOp; + int64_t myArg1; + int64_t myArg2; + std::function myOnComplete; + // Intrusive list link. + CalcRequest* myNext; +}; + +class CalcClient final + : private mg::aio::TCPSocketSubscription +{ +public: + CalcClient( + mg::aio::IOCore& aCore, + uint16_t aPort) + : mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + MG_LOG_INFO("Client", "start connecting"); + mySock->PostConnect(connParams, this); + } + + void + Delete() + { + // Don't just 'delete this'. The socket might be being handled in a worker thread + // right now. Better close the socket first. When this is done, we can safely + // delete this client in OnClose(). + mySock->PostClose(); + } + + void + Submit( + char aOp, + int64_t aArg1, + int64_t aArg2, + std::function&& aOnComplete) + { + MG_LOG_INFO("Client.Submit", "new request"); + CalcRequest* req = new CalcRequest(); + req->myOp = aOp; + req->myArg1 = aArg1; + req->myArg2 = aArg2; + req->myOnComplete = std::move(aOnComplete); + // Wakeup the socket only if the request is first in the queue. If it is not, the + // socket was already woken up anyway, and is going to be executed soon. + // Could also wakeup always, but it is just not needed. + if (myFrontQueue.Push(req)) + mySock->PostWakeup(); + } + +private: + ~CalcClient() final + { + // Doesn't matter in which order to delete the requests. Hence can use the + // non-ordered fast pop. + CalcRequest* head = myFrontQueue.PopAllFastReversed(); + while (head != nullptr) + { + CalcRequest* next = head->myNext; + delete head; + head = next; + } + while (!myQueue.IsEmpty()) + delete myQueue.PopFirst(); + } + + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", ""); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnEvent() final + { + CalcRequest* tail; + CalcRequest* head = myFrontQueue.PopAll(tail); + if (head == nullptr) + return; + + // Save the requests for later response handling. + myQueue.Append(head, tail); + // Send requests in bulk. + mg::net::BufferStream data; + while (head != nullptr) + { + MG_LOG_INFO("Client.OnEvent", "got request"); + // It is a bad idea to send integers like that, because the receiver might + // have a different byte order. In general, apps must encode numbers in some + // platform-agnostic way. Like always use Big Endian, or use a protocol like + // MesssagePack. Here it is omitted for simplicity. + data.WriteCopy(&head->myOp, sizeof(head->myOp)); + data.WriteCopy(&head->myArg1, sizeof(head->myArg1)); + data.WriteCopy(&head->myArg2, sizeof(head->myArg2)); + head = head->myNext; + } + mySock->SendRef(data.PopData()); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + // Responses in this case come in the same order as requests. + int64_t res = 0; + while (aStream.GetReadSize() >= sizeof(res)) + { + aStream.ReadData(&res, sizeof(res)); + CalcRequest* pos = myQueue.PopFirst(); + MG_BOX_ASSERT(pos != nullptr); + pos->myOnComplete(res); + delete pos; + } + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", ""); + // Try to always have all deletions in a single place. And the only suitable place + // is on-close. This is the only place where you can safely assume that the socket + // won't run again if you delete it right now. + mySock->Delete(); + delete this; + } + + mg::aio::TCPSocket* mySock; + // Queue of sent requests waiting for their responses. + mg::box::ForwardList myQueue; + // Queue if requests submitted by other threads. Not yet sent to the network. + mg::box::MultiProducerQueueIntrusive myFrontQueue; +}; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// CalcServer and its peers. Can be running remotely. Serves the math requests. +// +class CalcPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + CalcPeer( + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~CalcPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", ""); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + char op; + int64_t arg1, arg2; + constexpr uint32_t msgSize = sizeof(op) + sizeof(arg1) + sizeof(arg2); + mg::net::BufferStream data; + while (aStream.GetReadSize() >= msgSize) + { + aStream.ReadData(&op, sizeof(op)); + aStream.ReadData(&arg1, sizeof(arg1)); + aStream.ReadData(&arg2, sizeof(arg2)); + int64_t res; + switch(op) + { + case '+': res = arg1 + arg2; break; + case '-': res = arg1 - arg2; break; + default: + MG_BOX_ASSERT(!"Unreachable"); + res = 0; + break; + } + data.WriteCopy(&res, sizeof(res)); + } + mySock->SendRef(data.PopData()); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", ""); + mySock->Delete(); + delete this; + } + + mg::aio::TCPSocket* mySock; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class CalcServer final + : private mg::aio::TCPServerSubscription +{ +public: + CalcServer( + mg::aio::IOCore& aCore) + : myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + + void + Delete() + { + myServer->PostClose(); + } + +private: + ~CalcServer() = default; + + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new CalcPeer(myServer->GetCore(), aSock); + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + delete this; + } + + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// +// +// The business logic request, which is doing sub-requests to the remote CalcServer. +// +class MyRequest +{ +public: + MyRequest( + int aID, + CalcClient& aCalcClient, + mg::sch::TaskScheduler& aSched) + : myID(aID) + , myCalcClient(aCalcClient) + , myTask(Execute(this)) + { + aSched.Post(&myTask); + } + +private: + mg::box::Coro + Execute( + MyRequest* aSelf) + { + MG_LOG_INFO("MyRequest.Execute", "%d: start", myID); + + // Send a first math request. + MG_LOG_INFO("MyRequest.Execute", "%d: send sub-request 1", myID); + int64_t res = 0; + myCalcClient.Submit('+', 10, myID, [aSelf, &res](int64_t aRes) { + res = aRes; + // The completion callback sends a signal to the MyRequest's task which also + // wakes it up. + aSelf->myTask.PostSignal(); + }); + // Wait for the signal in a yielding loop, to handle potential spurious wakeups. + while (!co_await aSelf->myTask.AsyncReceiveSignal()); + // + MG_LOG_INFO("MyRequest.Execute", "%d: got response %lld", myID, (long long)res); + + // Do it all again with other data. + MG_LOG_INFO("MyRequest.Execute", "%d: send sub-request 2", myID); + res = 0; + myCalcClient.Submit('-', 100, myID, [aSelf, &res](int64_t aRes) { + res = aRes; + aSelf->myTask.PostSignal(); + }); + while (!co_await aSelf->myTask.AsyncReceiveSignal()); + MG_LOG_INFO("MyRequest.Execute", "%d: got response %lld", myID, (long long)res); + + // 'delete this' + co_return wouldn't work here. Because deletion of the self + // would destroy the C++ coroutine object. co_return would then fail with + // use-after-free. Also can't use myTask.AsyncExitDelete(), because it would + // delete the myTask member, not the whole MyRequest, and would crash. For this + // case there is a special handler AsyncExitExec() - can put here any termination + // logic. + co_await aSelf->myTask.AsyncExitExec([this](mg::sch::Task*) { + delete this; + }); + MG_BOX_ASSERT(!"Unreachable"); + } + + // Can add here any other data needed by the request. Moreover, things like clients to + // other remote systems (myCalcClient) typically would be either global singletons, or + // would be wrapped into a bigger struct having all such clients. + const int myID; + CalcClient& myCalcClient; + mg::sch::Task myTask; +}; + +static void +ServeRequests( + CalcClient& aClient) +{ + // Typically here we would start a server, listen for incoming connections, receive + // the requests, create MyRequest for each, and submit them to the scheduler. But in + // this case it is simplified to just a couple of hardcoded MyRequests. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + MG_LOG_INFO("ServeRequests", "got a couple of complex requests"); + new MyRequest(1, aClient, scheduler); + new MyRequest(2, aClient, scheduler); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(1 /* threads */); + + MG_LOG_INFO("Main", "start calc server"); + CalcServer* calcServer = new CalcServer(core); + uint16_t port = calcServer->Bind(); + calcServer->Start(); + + MG_LOG_INFO("Main", "start calc client"); + CalcClient *calcClient = new CalcClient(core, port); + + ServeRequests(*calcClient); + + MG_LOG_INFO("Main", "terminating"); + calcClient->Delete(); + calcServer->Delete(); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/iocore_04_tcp_periodic/CMakeLists.txt b/examples/iocore_04_tcp_periodic/CMakeLists.txt new file mode 100644 index 00000000..7dfd44be --- /dev/null +++ b/examples/iocore_04_tcp_periodic/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(iocore_04_tcp_periodic main.cpp) +add_dependencies(iocore_04_tcp_periodic install_serverbox) + +target_include_directories(iocore_04_tcp_periodic PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(iocore_04_tcp_periodic PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +set(libs + mgaio + mgnet + mgbox + mgboxstub +) +if(WIN32) + set(libs ${libs} ws2_32.lib crypt32.lib) +endif() +target_link_libraries(iocore_04_tcp_periodic + ${libs} +) diff --git a/examples/iocore_04_tcp_periodic/main.cpp b/examples/iocore_04_tcp_periodic/main.cpp new file mode 100644 index 00000000..8a3076f9 --- /dev/null +++ b/examples/iocore_04_tcp_periodic/main.cpp @@ -0,0 +1,344 @@ +#include "mg/aio/IOCore.h" +#include "mg/aio/TCPServer.h" +#include "mg/aio/TCPSocket.h" +#include "mg/aio/TCPSocketSubscription.h" +#include "mg/box/Log.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// An untrivial example how to use deadlines with IOCore. I.e. make a socket wakeup on a +// deadline or a delay if nothing is happening with it for a while. +// +// With this it becomes very simple and cheap to implement request timeouts. Normally it +// would look like a client has a bunch of requests in fly, waiting for their responses. +// Each request could have a different deadline (time_now + timeout). Then your code could +// store those requests in a binary heap (mg::box::BinaryHeap) sorted by the deadline. +// +// OnEvent() of your socket you would do socket->SetDeadline(heap.Top()->myDeadline) or +// something like that. Also in OnEvent() you check socket->IsExpired(). If yes, then you +// get the current timestamp, and pop from the heap all the requests whose deadline is +// less than the current time. They are expired, do anything with them. +// + +static const int theClientCount = 5; +static const int theMessageCount = 6; + +static bool +ReadData( + mg::net::BufferReadStream& aStream, + std::vector& aDst) +{ + uint64_t newSize = aStream.GetReadSize(); + MG_BOX_ASSERT(newSize > 0); + uint64_t oldSize = aDst.size(); + aDst.resize(oldSize + newSize); + aStream.ReadData(aDst.data() + oldSize, newSize); + // Terminating zero means that there is a complete message. + return aDst.back() == 0; +} + +////////////////////////////////////////////////////////////////////////////////////////// +// +// Client receives a fixed number of messages and deletes itself. The interesting part is +// that those requests are sent by the server once per second, not all at once. +// + +class MyClient final + // The socket subscription has a number of callbacks notifying us about various TCP + // events like on-connect, on-recv, on-send, etc. All of them are optional, and are + // always executed in a single thread, never in parallel. This allows us not to use + // any mutexes for data used exclusively by those callbacks. + : private mg::aio::TCPSocketSubscription +{ +public: + MyClient( + int aID, + mg::aio::IOCore& aCore, + uint16_t aPort) + : myID(aID) + , myRecvCount(0) + , mySock(new mg::aio::TCPSocket(aCore)) + { + mySock->Open({}); + mg::aio::TCPSocketConnectParams connParams; + connParams.myEndpoint = mg::net::HostMakeLocalIPV4(aPort).ToString(); + mySock->PostConnect(connParams, this); + } + + ~MyClient() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Client.OnConnect", "%d", myID); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream& aStream) final + { + if (!ReadData(aStream, myInput)) + return; + MG_LOG_INFO("Client.OnRecv", "%d: got '%s'", myID, myInput.data()); + if (++myRecvCount == theMessageCount) + { + mySock->PostClose(); + return; + } + myInput.clear(); + mySock->Recv(128); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Client.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + int myRecvCount; + mg::aio::TCPSocket* mySock; + std::vector myInput; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyPeer final + : private mg::aio::TCPSocketSubscription +{ +public: + MyPeer( + int aID, + mg::aio::IOCore& aCore, + mg::net::Socket aSock) + : myID(aID) + , mySock(new mg::aio::TCPSocket(aCore)) + , mySendDeadline(0) + { + mySock->Open({}); + mySock->PostWrap(aSock, this); + } + + ~MyPeer() final = default; + +private: + // TCPSocketSubscription event. + void + OnConnect() final + { + MG_LOG_INFO("Peer.OnConnect", "%d", myID); + // Receive isn't planned, but lets schedule it to catch a potential socket + // closure. Socket close = read of zero bytes, so we need a pending read to notice + // it. Can be of any > 0 size. + mySock->Recv(1); + } + + // TCPSocketSubscription event. + void + OnEvent() final + { + if (mySentCount >= theMessageCount) + { + // Everything is sent. Time to shutdown. + mySock->PostClose(); + return; + } + if (mySendDeadline > mg::box::GetMilliseconds()) + { + // Not expired yet. A spurious wakeup, or the deadline is set first time. + // Anyway, lets wait until it expires. + mySock->SetDeadline(mySendDeadline); + return; + } + MG_LOG_INFO("Peer.OnEvent", "%d: sending next msg", myID); + MG_BOX_ASSERT(mySendSize == 0); + // Deadline is infinite so a next message is only sent after this one is fully + // flushed. Although in such a simple example it is probably sent completely in + // one go. + mySendDeadline = MG_TIME_INFINITE; + std::string msg = mg::box::StringFormat("message %d", ++mySentCount); + mySendSize = (uint32_t)msg.size() + 1; + // Copy the string, because it is on stack right now. + mySock->SendCopy(msg.c_str(), mySendSize); + } + + // TCPSocketSubscription event. + void + OnRecv( + mg::net::BufferReadStream&) final + { + // The client isn't supposed to send anything. + MG_BOX_ASSERT(!"Unreachable"); + } + + // TCPSocketSubscription event. + void + OnSend( + uint32_t aByteCount) final + { + MG_BOX_ASSERT(aByteCount <= mySendSize); + mySendSize -= aByteCount; + if (mySendSize != 0) + return; + + // Finally all is sent. Send the next one in 1 second. + mySendDeadline = mg::box::GetMilliseconds() + 1000; + // OnEvent() is going to be executed again ASAP, but this is ok. It is simpler + // for us to implement all the logic in one function. It will handle this + // reschedule a spurious wakeup. + mySock->Reschedule(); + } + + // TCPSocketSubscription event. + void + OnError( + mg::box::Error*) final + { + // Can happen on Windows even on graceful close. + mySock->PostClose(); + } + + // TCPSocketSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Peer.OnClose", "%d", myID); + mySock->Delete(); + delete this; + } + + const int myID; + mg::aio::TCPSocket* mySock; + int mySentCount; + uint32_t mySendSize; + uint64_t mySendDeadline; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +class MyServer final + : private mg::aio::TCPServerSubscription +{ +public: + MyServer( + mg::aio::IOCore& aCore) + : myAcceptCount(0) + , myServer(mg::aio::TCPServer::NewShared(aCore)) + { + } + + uint16_t + Bind() + { + mg::box::Error::Ptr err; + bool ok = myServer->Bind(mg::net::HostMakeAllIPV4(0), err); + MG_BOX_ASSERT(ok); + return myServer->GetPort(); + } + + void + Start() + { + mg::box::Error::Ptr err; + bool ok = myServer->Listen(mg::net::SocketMaxBacklog(), this, err); + MG_BOX_ASSERT(ok); + } + +private: + // TCPServerSubscription event. + void + OnAccept( + mg::net::Socket aSock, + const mg::net::Host&) final + { + MG_LOG_INFO("Server.OnAccept", "new client"); + new MyPeer(++myAcceptCount, myServer->GetCore(), aSock); + if (myAcceptCount == theClientCount) + { + MG_LOG_INFO("Server.OnAccept", "start closing"); + myServer->PostClose(); + } + } + + // TCPServerSubscription event. + void + OnClose() final + { + MG_LOG_INFO("Server.OnClose", ""); + } + + int myAcceptCount; + mg::aio::TCPServer::Ptr myServer; +}; + +////////////////////////////////////////////////////////////////////////////////////////// + +static void +RunExample() +{ + mg::aio::IOCore core; + core.Start(3 /* threads */); + + MG_LOG_INFO("Main", "start server"); + MyServer server(core); + uint16_t port = server.Bind(); + server.Start(); + + MG_LOG_INFO("Main", "start clients"); + for (int i = 0; i < theClientCount; ++i) + new MyClient(i + 1, core, port); + + MG_LOG_INFO("Main", "wait for end"); + uint32_t remaining = core.WaitEmpty(); + MG_BOX_ASSERT(remaining == 0); + + MG_LOG_INFO("Main", "terminating"); +} + +////////////////////////////////////////////////////////////////////////////////////////// + +int +main() +{ +#if IS_PLATFORM_WIN + // WSAStartup() on Windows has to be done by the user. To be able to customize it as + // they want. Besides, it is a too low-level thing to be able to wrap it into anything + // platform-agnostic. + WSADATA data; + MG_BOX_ASSERT(WSAStartup(MAKEWORD(2, 2), &data) == 0); +#else + // SIGPIPE is pointless. This signal kills the process when trying to write to a + // socket whose connection is dead. The problem is that there is no way to know that + // at the moment of write. + signal(SIGPIPE, SIG_IGN); +#endif + + RunExample(); + +#if IS_PLATFORM_WIN + MG_BOX_ASSERT(WSACleanup() == 0); +#endif + return 0; +} diff --git a/examples/scheduler_01_simple_task/CMakeLists.txt b/examples/scheduler_01_simple_task/CMakeLists.txt new file mode 100644 index 00000000..f924b69e --- /dev/null +++ b/examples/scheduler_01_simple_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_01_simple_task main.cpp) +add_dependencies(scheduler_01_simple_task install_serverbox) + +target_include_directories(scheduler_01_simple_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_01_simple_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_01_simple_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_01_simple_task/main.cpp b/examples/scheduler_01_simple_task/main.cpp new file mode 100644 index 00000000..964601fe --- /dev/null +++ b/examples/scheduler_01_simple_task/main.cpp @@ -0,0 +1,22 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// The most trivial example of TaskScheduler usage. A single shot task which just prints +// something and gets deleted. +// + +int +main() +{ + mg::sch::TaskScheduler sched("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + sched.Post(new mg::sch::Task([&](mg::sch::Task *self) { + std::cout << "Executed in scheduler!\n"; + delete self; + })); + return 0; +} diff --git a/examples/scheduler_02_coroutine_task/CMakeLists.txt b/examples/scheduler_02_coroutine_task/CMakeLists.txt new file mode 100644 index 00000000..ad68726c --- /dev/null +++ b/examples/scheduler_02_coroutine_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_02_coroutine_task main.cpp) +add_dependencies(scheduler_02_coroutine_task install_serverbox) + +target_include_directories(scheduler_02_coroutine_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_02_coroutine_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_02_coroutine_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_02_coroutine_task/main.cpp b/examples/scheduler_02_coroutine_task/main.cpp new file mode 100644 index 00000000..5df2a508 --- /dev/null +++ b/examples/scheduler_02_coroutine_task/main.cpp @@ -0,0 +1,56 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A simple example of how to use C++ stack-less coroutines with TaskScheduler. +// + +int +main() +{ + mg::sch::Task task; + + // Do not use lambda capture-params! The thing is that C++ coroutines are not + // functions. They are **results** of functions. When a lambda returns a C++ + // corotuine, it needs to be invoked to return this coroutine. And after invocation + // the lambda is destroyed in this case. Which makes it not possible to use any lambda + // data, like captures. See for further explanation: + // https://stackoverflow.com/questions/60592174/lambda-lifetime-explanation-for-c20-coroutines + // + // Luckily, this problem is trivial to workaround. Just pass your values as arguments. + // Then the coroutine object captures them. + task.SetCallback([]( + mg::sch::Task& aSelf) -> mg::box::Coro { + + // Imagine the task sends a request to some async network client or alike. + std::cout << "Sending request ...\n"; + + // After sending, the task would wait for a response. Here it is simplified, so + // the task just yields and gets continued right away. + // Yield with no deadline or delay set simply re-schedules the task. + co_await aSelf.AsyncYield(); + + std::cout << "Received response!\n"; + co_await aSelf.AsyncYield(); + std::cout << "Finish\n"; + co_return; + }(task)); + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + scheduler.Post(&task); + return 0; +} diff --git a/examples/scheduler_03_multistep_task/CMakeLists.txt b/examples/scheduler_03_multistep_task/CMakeLists.txt new file mode 100644 index 00000000..8dc1a6d7 --- /dev/null +++ b/examples/scheduler_03_multistep_task/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_03_multistep_task main.cpp) +add_dependencies(scheduler_03_multistep_task install_serverbox) + +target_include_directories(scheduler_03_multistep_task PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_03_multistep_task PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_03_multistep_task + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_03_multistep_task/main.cpp b/examples/scheduler_03_multistep_task/main.cpp new file mode 100644 index 00000000..c0e6e80d --- /dev/null +++ b/examples/scheduler_03_multistep_task/main.cpp @@ -0,0 +1,88 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// An example how a task can consist of multiple untrivial steps, with yields in between, +// and with context used by those steps. +// + +class MyTask + : public mg::sch::Task +{ +public: + MyTask() + : Task([this](mg::sch::Task* aSelf) { + TaskSendRequest(aSelf); + }) {} + +private: + // Step 1. + void + TaskSendRequest( + mg::sch::Task* aSelf) + { + // The scheduler always gives the "self" as an argument. This is not needed when + // the task is inherited. But quite handy when a task is just a lambda. + MG_BOX_ASSERT(aSelf == this); + + // Imagine that the task sends an HTTP request via some other module, and is woken + // up, when the request ends. To execute the next step. For that the task changes + // its callback which will be executed when the task is woken up next time. + std::cout << "Sending request ...\n"; + aSelf->SetCallback([this](mg::sch::Task* aSelf) { + TaskRecvResponse(aSelf); + }); + mg::sch::TaskScheduler::This().Post(aSelf); + } + + // Step 2. + void + TaskRecvResponse( + mg::sch::Task* aSelf) + { + MG_BOX_ASSERT(aSelf == this); + + // Lets make another step, with a third callback which would be the final one. + std::cout << "Received response!\n"; + aSelf->SetCallback([this](mg::sch::Task *aSelf) { + TaskFinish(aSelf); + }); + mg::sch::TaskScheduler::This().Post(aSelf); + } + + // Step 3. + void + TaskFinish( + mg::sch::Task* aSelf) + { + MG_BOX_ASSERT(aSelf == this); + std::cout << "Finish\n"; + } + + // Here one would normally put various members needed by that task for its context. + // For example, request and user data related to this task. +}; + +int +main() +{ + MyTask task; + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + scheduler.Post(&task); + return 0; +} diff --git a/examples/scheduler_04_interacting_tasks/CMakeLists.txt b/examples/scheduler_04_interacting_tasks/CMakeLists.txt new file mode 100644 index 00000000..a973b1bd --- /dev/null +++ b/examples/scheduler_04_interacting_tasks/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(scheduler_04_interacting_tasks main.cpp) +add_dependencies(scheduler_04_interacting_tasks install_serverbox) + +target_include_directories(scheduler_04_interacting_tasks PUBLIC + ${MG_SERVERBOX_DIR}/include +) +target_link_directories(scheduler_04_interacting_tasks PUBLIC + ${MG_SERVERBOX_DIR}/lib +) +target_link_libraries(scheduler_04_interacting_tasks + mgsch + mgboxstub + mgbox +) diff --git a/examples/scheduler_04_interacting_tasks/main.cpp b/examples/scheduler_04_interacting_tasks/main.cpp new file mode 100644 index 00000000..ab48ec57 --- /dev/null +++ b/examples/scheduler_04_interacting_tasks/main.cpp @@ -0,0 +1,92 @@ +#include "mg/sch/TaskScheduler.h" + +#include + +// +// It is assumed you have seen the previous examples in this section, and some previously +// explained things don't need another repetition. +// +////////////////////////////////////////////////////////////////////////////////////////// +// +// A realistic example how one task might submit some async work to be executed by another +// task, which in turn would wake the first one up, when the work is done. +// + +static void +TaskSubmitRequest( + mg::sch::Task& aSender) +{ + // A sub-task is created. But normally for such need one would have a pre-running task + // or a thread or some other sort of async executor, which takes requests and executes + // them. + mg::sch::Task* worker = new mg::sch::Task(); + worker->SetCallback([]( + mg::sch::Task& aSelf, + mg::sch::Task& aSender) -> mg::box::Coro { + + std::cout << "Worker: sent request\n"; + // Lets simulate like if the request takes 1 second to get done. For that the + // task would yield for 1 second and then get executed again. + uint64_t t1 = mg::box::GetMilliseconds(); + aSelf.SetDelay(1000); + co_await aSelf.AsyncYield(); + + // This flag means the task was woken up because its deadline was due. + // Technically, it could wake up spuriously, but here we know it can't happen. + // Proper production code should still be ready to that though. + MG_BOX_ASSERT(aSelf.IsExpired()); + + uint64_t t2 = mg::box::GetMilliseconds(); + std::cout << "Worker: received response, took " << t2 - t1 << " msec\n"; + + // Wakeup the original task + let it know the request is actually done. The + // original task would be able to tell that by seeing that it's got a signal, not + // just a spurious wakeup. + aSender.PostSignal(); + + // 'delete self' + co_return wouldn't work here. Because deletion of the self + // would destroy the C++ coroutine object. co_return would then fail with + // use-after-free. For such 'delete and exit' case there is the special helper. + co_await aSelf.AsyncExitDelete(); + }(*worker, aSender)); + mg::sch::TaskScheduler::This().Post(worker); +} + +int +main() +{ + mg::sch::Task task; + + // The scheduler is defined after the task, so the task's destructor is not called + // before the scheduler is terminated. It would cause the task to be destroyed while + // in use. + // Normally one would allocate tasks on the heap and make them delete themselves when + // they are finished. + mg::sch::TaskScheduler scheduler("tst", + 1, // Thread count. + 5 // Subqueue size. + ); + + task.SetCallback([]( + mg::sch::Task& aSelf) -> mg::box::Coro { + + // The task wants something to be done asynchronously. It would then submit the + // work (could be another task, could be an HTTP client, or something alike) and + // wait for a notification when it is done. + std::cout << "Main: submit request\n"; + TaskSubmitRequest(aSelf); + + // The waiting is in a loop to protect the code from spurious wakeups. + do { + // Make sure to signalize that the task wants to wait (infinitely) until a + // signal comes. Without this the task would be just re-scheduled immediately + // and it would be a busy-loop. + aSelf.SetWait(); + } while (!co_await aSelf.AsyncReceiveSignal()); + + std::cout << "Main: finish\n"; + co_return; + }(task)); + scheduler.Post(&task); + return 0; +} diff --git a/src/mg/aio/IOTask.h b/src/mg/aio/IOTask.h index b1ec2d73..ffb2f1ed 100755 --- a/src/mg/aio/IOTask.h +++ b/src/mg/aio/IOTask.h @@ -370,6 +370,7 @@ namespace aio { bool IsExpired() const; mg::net::Socket GetSocket() const; bool HasSocket() const; + IOCore& GetCore(); // Deadline is reset on each wakeup. Setting a new deadline works only if it is // lower than the previous deadline installed during the same task execution. That @@ -819,6 +820,12 @@ namespace aio { return GetSocket() != mg::net::theInvalidSocket; } + inline IOCore& + IOTask::GetCore() + { + return myCore; + } + inline void IOTask::SetDeadline( uint64_t aDeadline) diff --git a/src/mg/aio/TCPServer.h b/src/mg/aio/TCPServer.h index 3ff59e6e..5eef2aa5 100644 --- a/src/mg/aio/TCPServer.h +++ b/src/mg/aio/TCPServer.h @@ -48,6 +48,7 @@ namespace aio { void PostClose(); bool IsClosed() const; + IOCore& GetCore(); private: TCPServer(IOCore& aCore); @@ -64,5 +65,11 @@ namespace aio { IOServerSocket* myBoundSocket; }; + inline IOCore& + TCPServer::GetCore() + { + return myTask.GetCore(); + } + } } diff --git a/src/mg/aio/TCPSocketIFace.h b/src/mg/aio/TCPSocketIFace.h index 1a36ef6a..bbac828b 100644 --- a/src/mg/aio/TCPSocketIFace.h +++ b/src/mg/aio/TCPSocketIFace.h @@ -119,6 +119,7 @@ namespace aio { bool IsClosed() const; bool IsConnected() const; bool IsInWorkerNow() const; + IOCore& GetCore(); // // Not thread-safe functions. Can only be called from an IO worker thread. From @@ -243,6 +244,12 @@ namespace aio { TCPSocketCtl* myCtl; }; + inline IOCore& + TCPSocketIFace::GetCore() + { + return myTask.GetCore(); + } + inline void TCPSocketIFace::SetDeadline( uint64_t aDeadline) diff --git a/src/mg/box/CMakeLists.txt b/src/mg/box/CMakeLists.txt index e0d9ed50..41f7cab3 100644 --- a/src/mg/box/CMakeLists.txt +++ b/src/mg/box/CMakeLists.txt @@ -59,8 +59,10 @@ set(install_headers RefCount.h SharedPtr.h Signal.h + StringFunctions.h Thread.h ThreadLocalPool.h + Time.h TypeTraits.h ) diff --git a/src/mg/box/Coro.h b/src/mg/box/Coro.h index e40a8c50..d7664ff4 100644 --- a/src/mg/box/Coro.h +++ b/src/mg/box/Coro.h @@ -32,15 +32,26 @@ namespace box { ////////////////////////////////////////////////////////////////////////////////////// - // Base class for all awaitable operations. They are supposed to be created and - // destroyed right in the co_await expression. Hence not copyable nor assignable. struct CoroOp { - CoroOp() = default; - CoroOp( - const CoroOp&) = delete; - CoroOp& operator=( - const CoroOp&) = delete; + // Base class for all awaitable operations. A while ago it had copy and move + // constructors deleted. Indeed, they (ops) are supposed to be created and + // destroyed right in the co_await expression. Hence not copyable nor assignable. + // + // But this led to undefined behaviour on MacOS with clang compiler 15.0.7 + // under certain weird random conditions. Debugging has revealed, that when + // another operation inherits this one, adds some members, and fills them in its + // constructor, those members are nullified by the time await_suspend is called. + // This went away, when the child operation got copy/move constructors defined. + // + // Apparently, the compiler was/is copying the operation after its creation into + // the coroutine's context. Completely ignoring if this is even allowed. Moreover, + // if this is banned, then stuff gets nullified and "copied" anyway. + // + // Unfortunately, the only reproducer was in GitHub Actions CI on their + // macos-latest image (14.7.2 23H311). Hence it was decided to simply drop those + // deleted constructors and allow "copying". Perhaps in the future this can be + // brought back. }; struct CoroOpIsNotReady { static constexpr bool await_ready() noexcept { return false; } }; diff --git a/src/mg/net/CMakeLists.txt b/src/mg/net/CMakeLists.txt index 60758d51..5a5ebe1d 100644 --- a/src/mg/net/CMakeLists.txt +++ b/src/mg/net/CMakeLists.txt @@ -51,6 +51,11 @@ set(install_headers Buffer.h DomainToIP.h Host.h + Socket.h + SSL.h + SSLContext.h + SSLContextOpenSSL.h + SSLOpenSSL.h URL.h ) diff --git a/src/mg/sch/CMakeLists.txt b/src/mg/sch/CMakeLists.txt index 56d39cce..d069633e 100644 --- a/src/mg/sch/CMakeLists.txt +++ b/src/mg/sch/CMakeLists.txt @@ -19,4 +19,4 @@ set(install_headers ) install(TARGETS mgsch DESTINATION "${install_lib_root}") -install(FILES ${install_headers} DESTINATION "${install_include_root}/mg/sched/") +install(FILES ${install_headers} DESTINATION "${install_include_root}/mg/sch/") diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index 47e51a2d..8eadfe38 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -68,11 +68,11 @@ namespace sch { mySignalFront.Send(); } - bool + TaskScheduleResult TaskScheduler::PrivSchedule() { if (myIsSchedulerWorking.ExchangeAcqRel(true)) - return false; + return TASK_SCHEDULE_BUSY; // Task status operations can all be relaxed inside the // scheduler. Syncing writes and reads between producers and @@ -87,11 +87,9 @@ namespace sch { uint64_t timestamp = mg::box::GetMilliseconds(); uint32_t batch; uint32_t maxBatch = mySchedBatchSize; + TaskScheduleResult result = TASK_SCHEDULE_DONE; retry: - if (PrivIsStopped()) - goto end; - // ------------------------------------------------------- // Handle waiting tasks. They are older than the ones in // the front queue, so must be handled first. @@ -215,11 +213,7 @@ namespace sch { // sleeping any moment. So the sched can't quit. It // must retry until either a front task appears, or // one of the waiting tasks' deadline is expired. - if (myQueueWaiting.Count() == 0) - { - mySignalFront.ReceiveBlocking(); - } - else + if (myQueueWaiting.Count() > 0) { deadline = myQueueWaiting.GetTop()->myDeadline; timestamp = mg::box::GetMilliseconds(); @@ -229,6 +223,18 @@ namespace sch { mg::box::TimeDuration(deadline - timestamp)); } } + else if (!PrivIsStopped()) + { + mySignalFront.ReceiveBlocking(); + } + else + { + // **Only** report scheduling is fully finished when there is actually + // nothing to do. That is, no tasks anywhere at all. Not in the front + // queue, not in the waiting queue, not in the ready queue. + result = TASK_SCHEDULE_FINISHED; + goto end; + } goto retry; } @@ -263,7 +269,7 @@ namespace sch { // and other workers will sleep on waiting for ready // tasks. PrivSignalReady(); - return true; + return result; } bool @@ -319,11 +325,13 @@ namespace sch { TaskScheduler::ourCurrent = myScheduler; uint64_t maxBatch = myScheduler->myExecBatchSize; uint64_t batch; - while (!myScheduler->PrivIsStopped()) + TaskScheduleResult result = TASK_SCHEDULE_DONE; + while (result != TASK_SCHEDULE_FINISHED) { do { - if (myScheduler->PrivSchedule()) + result = myScheduler->PrivSchedule(); + if (result != TASK_SCHEDULE_BUSY) myScheduleCount.IncrementRelaxed(); batch = 0; while (myScheduler->PrivExecute(myConsumer.Pop()) && ++batch < maxBatch); diff --git a/src/mg/sch/TaskScheduler.h b/src/mg/sch/TaskScheduler.h index a1e602d3..8c7935e0 100644 --- a/src/mg/sch/TaskScheduler.h +++ b/src/mg/sch/TaskScheduler.h @@ -49,6 +49,17 @@ namespace sch { class TaskSchedulerThread; + enum TaskScheduleResult + { + // Scheduling isn't done, another thread is doing it right now. + TASK_SCHEDULE_BUSY, + // Scheduling is done successfully. + TASK_SCHEDULE_DONE, + // Done successfully and was the last one. The scheduler is being stopped right + // now. + TASK_SCHEDULE_FINISHED, + }; + // Scheduler for asynchronous execution of tasks. Can be used // for tons of one-shot short-living tasks, as well as for // long-living periodic tasks with deadlines. @@ -109,7 +120,7 @@ namespace sch { void PrivPost( Task* aTask); - bool PrivSchedule(); + TaskScheduleResult PrivSchedule(); bool PrivExecute( Task* aTask); diff --git a/src/mg/stub/BoxStub.cpp b/src/mg/stub/BoxStub.cpp index c82e9e4f..5da020d5 100644 --- a/src/mg/stub/BoxStub.cpp +++ b/src/mg/stub/BoxStub.cpp @@ -2,6 +2,7 @@ #include "mg/box/Log.h" #include +#include // // The file implements missing functions from mg::box in some trivial way to get the tests @@ -42,6 +43,8 @@ namespace box { const char* aFormat, va_list aParams) { + static std::mutex mutex; + std::unique_lock lock(mutex); printf("%s (%s): ", LogLevelToStringWithPadding(aLevel), aTag); vprintf(aFormat, aParams); printf("\n"); diff --git a/test/aio/UnitTestTCPServer.cpp b/test/aio/UnitTestTCPServer.cpp index 26e3d724..0e158e38 100644 --- a/test/aio/UnitTestTCPServer.cpp +++ b/test/aio/UnitTestTCPServer.cpp @@ -51,6 +51,7 @@ namespace tcpserver { TEST_CHECK(!server->IsClosed()); TEST_CHECK(server->Bind(host, err)); TEST_CHECK(!server->IsClosed()); + TEST_CHECK(&server->GetCore() == &core); server->PostClose(); // Any IPv4. diff --git a/test/aio/UnitTestTCPSocketIFace.cpp b/test/aio/UnitTestTCPSocketIFace.cpp index ae65a9fb..9825e0bd 100755 --- a/test/aio/UnitTestTCPSocketIFace.cpp +++ b/test/aio/UnitTestTCPSocketIFace.cpp @@ -333,6 +333,7 @@ namespace tcpsocketiface { TestMessage* Pop(); uint64_t GetWakeupCount(); + mg::aio::IOCore& GetCore(); mg::box::ErrorCode GetError(); mg::box::ErrorCode GetErrorConnect(); @@ -455,6 +456,7 @@ namespace tcpsocketiface { // Basic test to see if works at all. TestClientSocket client; client.PostConnectBlocking(aPort); + TEST_CHECK(&client.GetCore() == &theContext->myCore); client.SetAutoRecv(); client.Send(new TestMessage()); delete client.PopBlocking(); @@ -2532,6 +2534,12 @@ namespace tcpsocketiface { return myWakeupCount; } + mg::aio::IOCore& + TestClientSocket::GetCore() + { + return mySocket->GetCore(); + } + mg::box::ErrorCode TestClientSocket::GetError() { diff --git a/test/sch/UnitTestTaskScheduler.cpp b/test/sch/UnitTestTaskScheduler.cpp index 690de402..2609f676 100644 --- a/test/sch/UnitTestTaskScheduler.cpp +++ b/test/sch/UnitTestTaskScheduler.cpp @@ -108,6 +108,28 @@ namespace sch { mg::box::Sleep(1); } + static void + UnitTestTaskSchedulerDestroyWithFront() + { + TestCaseGuard guard("Destroy with front"); + + mg::box::AtomicU32 doneCount(0); + mg::sch::Task task2([&](mg::sch::Task*) { + doneCount.IncrementRelaxed(); + }); + mg::sch::Task task1([&](mg::sch::Task*) { + // Task posts another one. To cover the case how does the scheduler behave, + // when a new task was submitted after shutdown is started. + mg::sch::TaskScheduler::This().Post(&task2); + doneCount.IncrementRelaxed(); + }); + { + mg::sch::TaskScheduler sched("tst", 1, 5); + sched.Post(&task1); + } + TEST_CHECK(doneCount.LoadRelaxed() == 2); + } + static void UnitTestTaskSchedulerOrder() { @@ -1507,6 +1529,7 @@ namespace sch { TestSuiteGuard suite("TaskScheduler"); UnitTestTaskSchedulerBasic(); + UnitTestTaskSchedulerDestroyWithFront(); UnitTestTaskSchedulerOrder(); UnitTestTaskSchedulerDomino(); UnitTestTaskSchedulerWakeup();