diff --git a/Connectors/Cpp/README.md b/Connectors/Cpp/README.md new file mode 100644 index 0000000..725fe17 --- /dev/null +++ b/Connectors/Cpp/README.md @@ -0,0 +1,130 @@ +# C++ Module Example + +Modules: + +- zmq-srv - Opens a ZMQ server on port 5556. Can connect to either the ZMQ client, or the DB. The server receives messages and produces 2 random values. +- zmq-client - Opens a ZMQ client that connects to zmq-srv. This is primarily used for testing/debugging +- shmem - Interfaces with the DB using shared memory. Can connect to ZMQ server isntead of Simulink to mimic the simple round-trip example + +Note: zmq-srv and zmq-client require cppzmq (libzmq3-dev on Ubuntu systems). + +To compile: +```shell +g++ zmq-srv.cpp -lzmq -o bin/zmq-srv +g++ zmq-client.cpp -lzmq -o bin/zmq-client +g++ shmem.cpp -o bin/shmem +``` + +In the following examples, all commands are run from the ARCADE/Connectors/Cpp directory and assumes the DB has been compiled and resides in the ARCADE/DataBroker/Linux directory. + +## Simple client / server test +Open a terminal and run: +```shell +./bin/zmq-srv +``` +In a second shell run: +```shell +./bin/zmq-client +``` +You should see values updating in both shells. + +## ZMQ server with DataBroker and Simulink +To run the simple Python example using the zmq-srv instead of the ZMQ python script, ensure input.json is in co-sim mode and using Simulink as the executable. + +```json +{ + "Simulator": [ + { + "executableName": "Simulink", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} +``` + +Next, follow the simple example to load the Simulink model. Instead of running the Python program, execute the following from a shell in the Cpp directory: +```shell +./bin/zmq-srv +``` +Finally, start the Simulink simulation. You should see the values update the same way they updated in the Python example. + +## ZMQ server with DataBroker and shmem +To run the simulation using the C++ shmem program instead of Simulink, do the following: + +In the Cpp directory, update input.json to use shmem instead of Simulink as shown below. +```json +{ + "Simulator": [ + { + "executableName": "./bin/shmem", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} +``` +Start the data broker using the following command: +```shell +../../DataBroker/Linux/DB +``` +The data broker should start and wait for an external connection. It should have an output similar to the output shown below + +
+ +```shell +Semaphores Initialized +Flag hold_for_dante = false +Flag co_sim_enable = true +Flag sync_enable = true +Flag realtime_timestep not in config! +Endpoint Initialization Complete +Starting Shm_Interface +Co-Simulation Enabled +Executable Name = ./bin/shmem +User Control Initializing +***Enter X to stop simulation*** + +Starting Simulator +Semaphores created by Data_Aggregator +waiting for DA ********************* +DA WAITING ON SHMFlag outputs = Output_Value_1,Output_Value_2 +Done waiting for DA +Wait for Semaphore +Entering loop +Semaphore captured +Semaphore captured +Update Points: 2 +Publish Points: 2 +Timestep Size 0.100000 +DA Semaphore captured +Init data written to shared memory +Received from Shm_Interface: PUB = 2, UP = 2, TimeStep = 0.100000 +Semaphore captured +Output_Value_1 4.266592 0.000000 sec +Output_Value_2 16.707166 0.000000 sec +Input_Value_1 DOUBLE -100000000000000.000000 0.000000 sec +Input_Value_2 DOUBLE -100000000000000.000000 0.000000 sec +Output_Value_1 9999999999999999583119736832.000000 0.000000 sec +Output_Value_2 9999999999999999583119736832.000000 0.000000 sec +``` +
+ +In a second shell, start the zmq-server using the following command: +```shell +./bin/zmq-srv +``` + +You should then see the output of both the zmq-srv and DB windows update with values. diff --git a/Connectors/Cpp/common.hpp b/Connectors/Cpp/common.hpp new file mode 100644 index 0000000..45cda94 --- /dev/null +++ b/Connectors/Cpp/common.hpp @@ -0,0 +1,37 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#define MSG_SIZE_MULT 256 +#define PUBLISH_POINTS_SHM_SEM "/pp_sem" +#define UPDATE_POINTS_SHM_SEM "/up_sem" +#define STOP_SEM "/stop" + +#define MAX_IO 1000 + +key_t keyp = 10618; +key_t keyu = 10619; +key_t msg_key = 10620; + +typedef struct { + char Name[128]; + char Type[50]; + double Value; + double Time; +} DATA; + +typedef struct { + int PUB; + int UP; + double TimeStep; +} MSG_DATA; + +DATA UP_DATA[MAX_IO]; // Upper limit of IO = 1000 +DATA PUB_DATA[MAX_IO]; \ No newline at end of file diff --git a/Connectors/Cpp/input.json b/Connectors/Cpp/input.json new file mode 100644 index 0000000..a7f982b --- /dev/null +++ b/Connectors/Cpp/input.json @@ -0,0 +1,15 @@ +{ + "Simulator": [ + { + "executableName": "./bin/shmem", + "hold_for_dante": "false", + "co_sim_enable": "true" + } + ], + "cosim": [ + { + "sync_enable": "true", + "outputs": "Output_Value_1,Output_Value_2" + } + ] +} diff --git a/Connectors/Cpp/shmem.cpp b/Connectors/Cpp/shmem.cpp new file mode 100644 index 0000000..26ad05a --- /dev/null +++ b/Connectors/Cpp/shmem.cpp @@ -0,0 +1,151 @@ +#include "shmem.hpp" + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +static void initialize(){ + + sem_t *semu; + sem_t *semp; + sem_t *msg_sem; + + int shmidp = shmget(keyp, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidp == -1) { + std::cout << "Failed to create shared memory for publish points." << std::endl; + return; + } + int shmidu = shmget(keyu, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidu == -1) { + std::cout << "Failed to create shared memory for update points." << std::endl; + return; + } + DATA *publishPointsShmAddress = (DATA *) shmat(shmidp,NULL,0); + DATA *updatePointsShmAddress = (DATA *) shmat(shmidu,NULL,0); + + std::snprintf(publishPointsShmAddress[0].Name,128,"Output_Value_1"); + std::snprintf(publishPointsShmAddress[1].Name,128,"Output_Value_2"); + + std::snprintf(updatePointsShmAddress[0].Name,128,"Input_Value_1"); + std::snprintf(updatePointsShmAddress[1].Name,128,"Input_Value_2"); + updatePointsShmAddress[0].Value = -100000000000000.0; + updatePointsShmAddress[1].Value = -100000000000000.0; + updatePointsShmAddress[0].Time = 0.0; + updatePointsShmAddress[1].Time = 0.0; + std::snprintf(updatePointsShmAddress[0].Type,50,"DOUBLE"); + std::snprintf(updatePointsShmAddress[1].Type,50,"DOUBLE"); + + shmdt(publishPointsShmAddress); + shmdt(updatePointsShmAddress); + + /* Send number of inputs and outputs to data broker */ + msg_sem = sem_open("/msg", O_CREAT, 0644, 0); + if (msg_sem == SEM_FAILED) { + std::cout << "Failed to open semaphore for init messaging." << std::endl; + return; + } + + int shmdb = shmget(msg_key, sizeof(MSG_DATA), 0600|IPC_CREAT); + if (shmdb == -1) { + std::cout << "Failed to create shared memory for init messaging." << std::endl; + return; + } + + MSG_DATA *MSG_DB = (MSG_DATA *) shmat(shmdb,NULL,0); + if (MSG_DB == (void *)-1) { + std::cout << "Failed to attach shared memory for init messaging." << std::endl; + return; + } + MSG_DB->UP = 2; + MSG_DB->PUB = 2; + MSG_DB->TimeStep = 0.1; + + shmdt(MSG_DB); + sem_post(msg_sem); +} + +static void process_update() { + + sem_t *semu; + sem_t *semp; + + /* set up semaphores for flow control*/ + semu = sem_open(UPDATE_POINTS_SHM_SEM, 0); + if (semu == SEM_FAILED) { + std::cout << "Failed to open semaphore for update points." << std::endl; + return; + } + semp = sem_open(PUBLISH_POINTS_SHM_SEM, 0); + if (semp == SEM_FAILED) { + std::cout<< "Failed to open semaphore for publish points." << std::endl; + return; + } + + /* set up shared memory */ + + int shmidu = shmget(keyu, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidu == -1) { + std::cout << "Failed to create shared memory for update points." << std::endl; + return; + } + DATA *updatePointsShmAddress = (DATA *) shmat(shmidu,NULL,0); + if (updatePointsShmAddress == (void *)-1) { + std::cout << "Failed to attach shared memory for update points." << std::endl; + return; + } + + int shmidp = shmget(keyp, 2 * sizeof(DATA), 0600|IPC_CREAT); + if (shmidp == -1) { + std::cout << "Failed to create shared memory for publish points." << std::endl; + return; + } + DATA *publishPointsShmAddress = (DATA *) shmat(shmidp,NULL,0); + if (publishPointsShmAddress == (void *)-1) { + std::cout << "Failed to attach shared memory for publish points." << std::endl; + return; + } + + /* wait for update semaphore*/ + sem_wait(semu); + + /* get values from data broker and calc output values */ + double val1 = updatePointsShmAddress[0].Value * updatePointsShmAddress[1].Value; + double val2 = updatePointsShmAddress[0].Value + updatePointsShmAddress[1].Value; + + /* publish values*/ + publishPointsShmAddress[0].Value = val1; + publishPointsShmAddress[1].Value = val2; + + shmdt(publishPointsShmAddress); + shmdt(updatePointsShmAddress); + + /* update point semaphore */ + sem_post(semp); +} + +int terminate() { + return 0; +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + initialize(); + while(run){ + process_update(); + } + + return 0; + + +} \ No newline at end of file diff --git a/Connectors/Cpp/shmem.hpp b/Connectors/Cpp/shmem.hpp new file mode 100644 index 0000000..83073d4 --- /dev/null +++ b/Connectors/Cpp/shmem.hpp @@ -0,0 +1,5 @@ +#include "common.hpp" +// #include "Simulation.hpp" + + + diff --git a/Connectors/Cpp/zmq-client.cpp b/Connectors/Cpp/zmq-client.cpp new file mode 100644 index 0000000..710e4f8 --- /dev/null +++ b/Connectors/Cpp/zmq-client.cpp @@ -0,0 +1,70 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution distrib(0.0, 5.0); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + try { + zmq::context_t ctx; + zmq::socket_t socket(ctx, zmq::socket_type::req); + socket.connect("tcp://0.0.0.0:5556"); + std::cout << "Connecting to ZMQ test server running on tcp://0.0.0.0:5556..." << std::endl; + + while(run){ + // 1. Send data + std::ostringstream rsp; + rsp << "{\"Output_Value_1\":" << distrib(gen) << ",\"Output_Value_2\":" << distrib(gen) << "}"; + std::cout << rsp.str() << std::endl; + + zmq::message_t reply{rsp.str().data(), rsp.str().size()}; + socket.send(reply, zmq::send_flags::none); + + // 2. Receive data from DataBroker + zmq::message_t msg; + auto recv_result = socket.recv(msg, zmq::recv_flags::none); + if (recv_result) { + std::string_view msg_str{static_cast(msg.data()), msg.size()}; + std::cout << "Received: " << msg_str << std::endl; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + + } catch (const zmq::error_t& e) { + std::cout << e.what() << std::endl; + return 1; + + } + + + return 0; + + +} \ No newline at end of file diff --git a/Connectors/Cpp/zmq-srv.cpp b/Connectors/Cpp/zmq-srv.cpp new file mode 100644 index 0000000..6d83e64 --- /dev/null +++ b/Connectors/Cpp/zmq-srv.cpp @@ -0,0 +1,66 @@ +#include +#include +#include +#include +#include +#include +#include + + + +std::atomic run(true); + +void signal_handler(int signal_num) { + if (signal_num == SIGINT){ + run = false; + } +} + +int main(int argc, char** argv){ + + std::signal(SIGINT, signal_handler); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution distrib(0.0, 5.0); + + std::vector args(argv + 1, argv + argc); + for (const auto& arg: args){ + std::cout << "arg: " << arg << "\n"; + } + + try { + zmq::context_t ctx; + zmq::socket_t socket(ctx, zmq::socket_type::rep); + socket.bind("tcp://0.0.0.0:5556"); + std::cout << "ZMQ test server is running on tcp://0.0.0.0:5556..." << std::endl; + + while(run){ + // 1. Receive data from DataBroker + zmq::message_t msg; + auto recv_result = socket.recv(msg, zmq::recv_flags::none); + if (recv_result) { + std::string_view msg_str{static_cast(msg.data()), msg.size()}; + std::cout << "Received: " << msg_str << std::endl; + } + + // 2. Respond with new data + std::ostringstream rsp; + rsp << "{\"Input_Value_1\":" << distrib(gen) << ",\"Input_Value_2\":" << distrib(gen) << "}"; + std::cout << rsp.str() << std::endl; + + zmq::message_t reply{rsp.str().data(), rsp.str().size()}; + socket.send(reply, zmq::send_flags::none); + } + + + } catch (const zmq::error_t& e) { + std::cout << e.what() << std::endl; + return 1; + + } + + + return 0; + + +} \ No newline at end of file