From 0d5b75b68eaa9417c649824ca8df937913da7ce9 Mon Sep 17 00:00:00 2001 From: ych Date: Thu, 18 Jun 2026 17:52:22 -0400 Subject: [PATCH 1/5] initial cpp zmq server / client that duplicates simple python example (zmq-srv) --- Connectors/Cpp/zmq-client.cpp | 70 +++++++++++++++++++++++++++++++++++ Connectors/Cpp/zmq-srv.cpp | 66 +++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 Connectors/Cpp/zmq-client.cpp create mode 100644 Connectors/Cpp/zmq-srv.cpp diff --git a/Connectors/Cpp/zmq-client.cpp b/Connectors/Cpp/zmq-client.cpp new file mode 100644 index 0000000..710e4f8 --- /dev/null +++ b/Connectors/Cpp/zmq-client.cpp @@ -0,0 +1,70 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution distrib(0.0, 5.0); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + try { + zmq::context_t ctx; + zmq::socket_t socket(ctx, zmq::socket_type::req); + socket.connect("tcp://0.0.0.0:5556"); + std::cout << "Connecting to ZMQ test server running on tcp://0.0.0.0:5556..." << std::endl; + + while(run){ + // 1. Send data + std::ostringstream rsp; + rsp << "{\"Output_Value_1\":" << distrib(gen) << ",\"Output_Value_2\":" << distrib(gen) << "}"; + std::cout << rsp.str() << std::endl; + + zmq::message_t reply{rsp.str().data(), rsp.str().size()}; + socket.send(reply, zmq::send_flags::none); + + // 2. Receive data from DataBroker + zmq::message_t msg; + auto recv_result = socket.recv(msg, zmq::recv_flags::none); + if (recv_result) { + std::string_view msg_str{static_cast(msg.data()), msg.size()}; + std::cout << "Received: " << msg_str << std::endl; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + + } catch (const zmq::error_t& e) { + std::cout << e.what() << std::endl; + return 1; + + } + + + return 0; + + +} \ No newline at end of file diff --git a/Connectors/Cpp/zmq-srv.cpp b/Connectors/Cpp/zmq-srv.cpp new file mode 100644 index 0000000..6d83e64 --- /dev/null +++ b/Connectors/Cpp/zmq-srv.cpp @@ -0,0 +1,66 @@ +#include +#include +#include +#include +#include +#include +#include + + + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution distrib(0.0, 5.0); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + try { + zmq::context_t ctx; + zmq::socket_t socket(ctx, zmq::socket_type::rep); + socket.bind("tcp://0.0.0.0:5556"); + std::cout << "ZMQ test server is running on tcp://0.0.0.0:5556..." << std::endl; + + while(run){ + // 1. Receive data from DataBroker + zmq::message_t msg; + auto recv_result = socket.recv(msg, zmq::recv_flags::none); + if (recv_result) { + std::string_view msg_str{static_cast(msg.data()), msg.size()}; + std::cout << "Received: " << msg_str << std::endl; + } + + // 2. Respond with new data + std::ostringstream rsp; + rsp << "{\"Input_Value_1\":" << distrib(gen) << ",\"Input_Value_2\":" << distrib(gen) << "}"; + std::cout << rsp.str() << std::endl; + + zmq::message_t reply{rsp.str().data(), rsp.str().size()}; + socket.send(reply, zmq::send_flags::none); + } + + + } catch (const zmq::error_t& e) { + std::cout << e.what() << std::endl; + return 1; + + } + + + return 0; + + +} \ No newline at end of file From 1cff646deff1e50af8579db0bb27967568d0f18e Mon Sep 17 00:00:00 2001 From: ych Date: Fri, 19 Jun 2026 15:34:36 -0400 Subject: [PATCH 2/5] initial shmem program and basic readme --- Connectors/Cpp/README.md | 127 +++++++++++++++++++++++++++++++++ Connectors/Cpp/common.hpp | 37 ++++++++++ Connectors/Cpp/input.json | 15 ++++ Connectors/Cpp/shmem.cpp | 146 ++++++++++++++++++++++++++++++++++++++ Connectors/Cpp/shmem.hpp | 5 ++ 5 files changed, 330 insertions(+) create mode 100644 Connectors/Cpp/README.md create mode 100644 Connectors/Cpp/common.hpp create mode 100644 Connectors/Cpp/input.json create mode 100644 Connectors/Cpp/shmem.cpp create mode 100644 Connectors/Cpp/shmem.hpp diff --git a/Connectors/Cpp/README.md b/Connectors/Cpp/README.md new file mode 100644 index 0000000..8bfd81a --- /dev/null +++ b/Connectors/Cpp/README.md @@ -0,0 +1,127 @@ +# C++ Module Example + +Modules: + +- zmq-srv - Opens a ZMQ server on port 5556. Can connect to either the ZMQ client, or the DB. The server receives messages and produces 2 random values. +- zmq-client - Opens a ZMQ client that connects to zmq-srv. This is primarily used for testing/debugging +- shmem - Interfaces with the DB using shared memory. Can connect to ZMQ server isntead of Simulink to mimic the simple round-trip example + +To compile: +```shell +g++ zmq-srv.cpp -lzmq -o bin/zmq-srv +g++ zmq-client.cpp -lzmq -o bin/zmq-client +g++ shmem.cpp -o bin/shmem +``` + +In the following examples, all commands are run from the ARCADE/Connectors/Cpp directory and assumes the DB has been compiled and resides in the ARCADE/DataBroker/Linux directory. + +## Simple client / server test +Open a terminal and run: +```shell +./bin/zmq-serv +``` +In a second shell run: +```shell +./bin/zmq-client +``` + +## ZMQ server with DataBroker and Simulink +To run the simple Python example using the zmq-srv instead of the ZMQ python script, ensure input.json is in co-sim mode and using Simulink as the executable. + +```json +{ + "Simulator": [ + { + "executableName": "Simulink", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} +``` + +Next, follow the simple example to load the Simulink model. Instead of running the Python program, execute the following from a shell in the Cpp directory: +```shell +./bin/zmq-shell +``` +Finally, start the Simulink simulation. You should see the values update the same way they updated in the Python example. + +## ZMQ server with DataBroker and shmem +To run the simulation using the C++ shmem program instead of Simulink, do the following: + +In the Cpp directory, update input.json to use shmem instead of Simulink as shown below. +```json +{ + "Simulator": [ + { + "executableName": "./bin/shmem", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} +``` +Start the data broker using the following command: +```shell +../../DataBroker/Linux/DB +``` +The data broker should start and wait for an external connection. It should have an output similar to the output shown below + +
+ +```shell +Semaphores Initialized +Flag hold_for_dante = false +Flag co_sim_enable = true +Flag sync_enable = true +Flag realtime_timestep not in config! +Endpoint Initialization Complete +Starting Shm_Interface +Co-Simulation Enabled +Executable Name = ./bin/shmem +User Control Initializing +***Enter X to stop simulation*** + +Starting Simulator +Semaphores created by Data_Aggregator +waiting for DA ********************* +DA WAITING ON SHMFlag outputs = Output_Value_1,Output_Value_2 +Done waiting for DA +Wait for Semaphore +Entering loop +Semaphore captured +Semaphore captured +Update Points: 2 +Publish Points: 2 +Timestep Size 0.100000 +DA Semaphore captured +Init data written to shared memory +Received from Shm_Interface: PUB = 2, UP = 2, TimeStep = 0.100000 +Semaphore captured +Output_Value_1 4.266592 0.000000 sec +Output_Value_2 16.707166 0.000000 sec +Input_Value_1 DOUBLE -100000000000000.000000 0.000000 sec +Input_Value_2 DOUBLE -100000000000000.000000 0.000000 sec +Output_Value_1 9999999999999999583119736832.000000 0.000000 sec +Output_Value_2 9999999999999999583119736832.000000 0.000000 sec +``` +
+ +In a second shell, start the zmq-server using the following command: +```shell +./bin/zmq-srv +``` + +You should then see the output of both the zmq-serv and DB windows update with values. \ No newline at end of file diff --git a/Connectors/Cpp/common.hpp b/Connectors/Cpp/common.hpp new file mode 100644 index 0000000..45cda94 --- /dev/null +++ b/Connectors/Cpp/common.hpp @@ -0,0 +1,37 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#define MSG_SIZE_MULT 256 +#define PUBLISH_POINTS_SHM_SEM "/pp_sem" +#define UPDATE_POINTS_SHM_SEM "/up_sem" +#define STOP_SEM "/stop" + +#define MAX_IO 1000 + +key_t keyp = 10618; +key_t keyu = 10619; +key_t msg_key = 10620; + +typedef struct { + char Name[128]; + char Type[50]; + double Value; + double Time; +} DATA; + +typedef struct { + int PUB; + int UP; + double TimeStep; +} MSG_DATA; + +DATA UP_DATA[MAX_IO]; // Upper limit of IO = 1000 +DATA PUB_DATA[MAX_IO]; \ No newline at end of file diff --git a/Connectors/Cpp/input.json b/Connectors/Cpp/input.json new file mode 100644 index 0000000..a7f982b --- /dev/null +++ b/Connectors/Cpp/input.json @@ -0,0 +1,15 @@ +{ + "Simulator": [ + { + "executableName": "./bin/shmem", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} diff --git a/Connectors/Cpp/shmem.cpp b/Connectors/Cpp/shmem.cpp new file mode 100644 index 0000000..42ad516 --- /dev/null +++ b/Connectors/Cpp/shmem.cpp @@ -0,0 +1,146 @@ +#include "shmem.hpp" + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +static void initialize(){ + + sem_t *semu; + sem_t *semp; + sem_t *msg_sem; + + int shmidp = shmget(keyp, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidp == -1) { + std::cout << "Failed to create shared memory for publish points." << std::endl; + return; + } + int shmidu = shmget(keyu, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidu == -1) { + std::cout << "Failed to create shared memory for update points." << std::endl; + return; + } + DATA *publishPointsShmAddress = (DATA *) shmat(shmidp,NULL,0); + DATA *updatePointsShmAddress = (DATA *) shmat(shmidu,NULL,0); + + std::snprintf(publishPointsShmAddress[0].Name,128,"Output_Value_1"); + std::snprintf(publishPointsShmAddress[1].Name,128,"Output_Value_2"); + + std::snprintf(updatePointsShmAddress[0].Name,128,"Input_Value_1"); + std::snprintf(updatePointsShmAddress[1].Name,128,"Input_Value_2"); + updatePointsShmAddress[0].Value = -100000000000000.0; + updatePointsShmAddress[1].Value = -100000000000000.0; + updatePointsShmAddress[0].Time = 0.0; + updatePointsShmAddress[1].Time = 0.0; + std::snprintf(updatePointsShmAddress[0].Type,50,"DOUBLE"); + std::snprintf(updatePointsShmAddress[1].Type,50,"DOUBLE"); + + shmdt(publishPointsShmAddress); + shmdt(updatePointsShmAddress); + + /* Send number of inputs and outputs to data broker */ + msg_sem = sem_open("/msg", O_CREAT, 0644, 0); + if (msg_sem == SEM_FAILED) { + std::cout << "Failed to open semaphore for init messaging." << std::endl; + return; + } + + int shmdb = shmget(msg_key, sizeof(MSG_DATA), 0600|IPC_CREAT); + if (shmdb == -1) { + std::cout << "Failed to create shared memory for init messaging." << std::endl; + return; + } + + MSG_DATA *MSG_DB = (MSG_DATA *) shmat(shmdb,NULL,0); + if (MSG_DB == (void *)-1) { + std::cout << "Failed to attach shared memory for init messaging." << std::endl; + return; + } + MSG_DB->UP = 2; + MSG_DB->PUB = 2; + MSG_DB->TimeStep = 0.1; + + shmdt(MSG_DB); + sem_post(msg_sem); +} + +static void process_update() { + + sem_t *semu; + sem_t *semp; + + semu = sem_open(UPDATE_POINTS_SHM_SEM, 0); + if (semu == SEM_FAILED) { + std::cout << "Failed to open semaphore for update points." << std::endl; + return; + } + semp = sem_open(PUBLISH_POINTS_SHM_SEM, 0); + if (semp == SEM_FAILED) { + std::cout<< "Failed to open semaphore for publish points." << std::endl; + return; + } + + /* set up shared memory */ + + int shmidu = shmget(keyu, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidu == -1) { + std::cout << "Failed to create shared memory for update points." << std::endl; + return; + } + DATA *updatePointsShmAddress = (DATA *) shmat(shmidu,NULL,0); + if (updatePointsShmAddress == (void *)-1) { + std::cout << "Failed to attach shared memory for update points." << std::endl; + return; + } + + int shmidp = shmget(keyp, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidp == -1) { + std::cout << "Failed to create shared memory for publish points." << std::endl; + return; + } + DATA *publishPointsShmAddress = (DATA *) shmat(shmidp,NULL,0); + if (publishPointsShmAddress == (void *)-1) { + std::cout << "Failed to attach shared memory for publish points." << std::endl; + return; + } + + sem_wait(semu); + + double val1 = updatePointsShmAddress[0].Value * updatePointsShmAddress[1].Value;; + double val2 = updatePointsShmAddress[0].Value * updatePointsShmAddress[0].Value;; + + publishPointsShmAddress[0].Value = val1; + publishPointsShmAddress[1].Value = val2; + + shmdt(publishPointsShmAddress); + shmdt(updatePointsShmAddress); + + sem_post(semp); +} + +int terminate() { + return 0; +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + initialize(); + while(run){ + process_update(); + } + + return 0; + + +} \ No newline at end of file diff --git a/Connectors/Cpp/shmem.hpp b/Connectors/Cpp/shmem.hpp new file mode 100644 index 0000000..83073d4 --- /dev/null +++ b/Connectors/Cpp/shmem.hpp @@ -0,0 +1,5 @@ +#include "common.hpp" +// #include "Simulation.hpp" + + + From 8d4daef150dfb3be51def8740e1a519daea87362 Mon Sep 17 00:00:00 2001 From: Joshua Hambrick Date: Fri, 19 Jun 2026 15:38:44 -0400 Subject: [PATCH 3/5] Update README.md --- Connectors/Cpp/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Connectors/Cpp/README.md b/Connectors/Cpp/README.md index 8bfd81a..12b7259 100644 --- a/Connectors/Cpp/README.md +++ b/Connectors/Cpp/README.md @@ -24,6 +24,7 @@ In a second shell run: ```shell ./bin/zmq-client ``` +You should see values updating in both shells. ## ZMQ server with DataBroker and Simulink To run the simple Python example using the zmq-srv instead of the ZMQ python script, ensure input.json is in co-sim mode and using Simulink as the executable. @@ -48,7 +49,7 @@ To run the simple Python example using the zmq-srv instead of the ZMQ python scr Next, follow the simple example to load the Simulink model. Instead of running the Python program, execute the following from a shell in the Cpp directory: ```shell -./bin/zmq-shell +./bin/zmq-srv ``` Finally, start the Simulink simulation. You should see the values update the same way they updated in the Python example. @@ -124,4 +125,4 @@ In a second shell, start the zmq-server using the following command: ./bin/zmq-srv ``` -You should then see the output of both the zmq-serv and DB windows update with values. \ No newline at end of file +You should then see the output of both the zmq-serv and DB windows update with values. From 52bafbe7d59396f39601614fb670cd9683417000 Mon Sep 17 00:00:00 2001 From: Joshua Hambrick Date: Fri, 19 Jun 2026 15:39:12 -0400 Subject: [PATCH 4/5] Update README.md --- Connectors/Cpp/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Connectors/Cpp/README.md b/Connectors/Cpp/README.md index 12b7259..0dc3abe 100644 --- a/Connectors/Cpp/README.md +++ b/Connectors/Cpp/README.md @@ -18,7 +18,7 @@ In the following examples, all commands are run from the ARCADE/Connectors/Cpp d ## Simple client / server test Open a terminal and run: ```shell -./bin/zmq-serv +./bin/zmq-srv ``` In a second shell run: ```shell From 75e4b5a465fb2c8cb182f060a5e6ec0047cf406d Mon Sep 17 00:00:00 2001 From: ych Date: Thu, 25 Jun 2026 14:37:24 -0400 Subject: [PATCH 5/5] updated documentation and assorted fixes --- Connectors/Cpp/README.md | 4 +++- Connectors/Cpp/shmem.cpp | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Connectors/Cpp/README.md b/Connectors/Cpp/README.md index 0dc3abe..725fe17 100644 --- a/Connectors/Cpp/README.md +++ b/Connectors/Cpp/README.md @@ -6,6 +6,8 @@ Modules: - zmq-client - Opens a ZMQ client that connects to zmq-srv. This is primarily used for testing/debugging - shmem - Interfaces with the DB using shared memory. Can connect to ZMQ server isntead of Simulink to mimic the simple round-trip example +Note: zmq-srv and zmq-client require cppzmq (libzmq3-dev on Ubuntu systems). + To compile: ```shell g++ zmq-srv.cpp -lzmq -o bin/zmq-srv @@ -125,4 +127,4 @@ In a second shell, start the zmq-server using the following command: ./bin/zmq-srv ``` -You should then see the output of both the zmq-serv and DB windows update with values. +You should then see the output of both the zmq-srv and DB windows update with values. diff --git a/Connectors/Cpp/shmem.cpp b/Connectors/Cpp/shmem.cpp index 42ad516..26ad05a 100644 --- a/Connectors/Cpp/shmem.cpp +++ b/Connectors/Cpp/shmem.cpp @@ -73,6 +73,7 @@ static void process_update() { sem_t *semu; sem_t *semp; + /* set up semaphores for flow control*/ semu = sem_open(UPDATE_POINTS_SHM_SEM, 0); if (semu == SEM_FAILED) { std::cout << "Failed to open semaphore for update points." << std::endl; @@ -108,17 +109,21 @@ static void process_update() { return; } + /* wait for update semaphore*/ sem_wait(semu); - double val1 = updatePointsShmAddress[0].Value * updatePointsShmAddress[1].Value;; - double val2 = updatePointsShmAddress[0].Value * updatePointsShmAddress[0].Value;; + /* get values from data broker and calc output values */ + double val1 = updatePointsShmAddress[0].Value * updatePointsShmAddress[1].Value; + double val2 = updatePointsShmAddress[0].Value + updatePointsShmAddress[1].Value; + /* publish values*/ publishPointsShmAddress[0].Value = val1; publishPointsShmAddress[1].Value = val2; shmdt(publishPointsShmAddress); shmdt(updatePointsShmAddress); + /* update point semaphore */ sem_post(semp); }