-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.cpp
More file actions
436 lines (356 loc) · 14.2 KB
/
main.cpp
File metadata and controls
436 lines (356 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
// Asynchronous SQLite API's implemented using C++ delegates
// @see https://github.com/endurodave/Async-SQLite
// @see https://github.com/endurodave/DelegateMQ
// David Lafreniere, 2026.
// This application demonstrates various usage patterns for the Async-SQLite wrapper:
// 1. Simple Asynchronous Execution: Basic fire-and-forget SQL commands.
// 2. Internal Thread Scheduling: Manually creating delegates to run on the SQLite thread.
// 3. Multithreaded Blocking: Simulating synchronous load from multiple worker threads.
// 4. Multithreaded Non-Blocking: High-concurrency load using callbacks.
// 5. Future/Async API: Using std::future to interleave main thread work with database operations.
#include "DelegateMQ.h"
#include <stdio.h>
#include <sqlite3.h>
#include <string>
#include <iostream>
#include "async_sqlite3.h"
#include "async_sqlite3_ut.h"
using namespace std;
using namespace dmq;
// Worker thread instances
Thread workerThreads[] = {
{ "WorkerThread1" },
{ "WorkerThread2" }
};
Thread nonBlockingAsyncThread("NonBlockingAsyncThread");
static const int WORKER_THREAD_CNT = sizeof(workerThreads) / sizeof(workerThreads[0]);
static std::mutex mtx; // Mutex to synchronize access to the condition variable
static std::condition_variable cv; // Condition variable to block and notify threads
static bool ready = false; // Shared state to check if the thread should proceed
static std::mutex printMutex;
static sqlite3* db_multithread = nullptr;
static MulticastDelegateSafe<void(int)> completeCallback;
static std::atomic<bool> completeFlag = false;
std::atomic<bool> processTimerExit = false;
static void ProcessTimers()
{
while (!processTimerExit.load())
{
// Process all delegate-based timers
Timer::ProcessTimers();
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
}
// Thread safe printf function that locks the mutex
void printf_safe(const char* format, ...)
{
std::lock_guard<std::mutex> lock(printMutex); // Lock the mutex
// Start variadic arguments processing
va_list args;
va_start(args, format);
vprintf(format, args); // Call vprintf to handle the format string and arguments
va_end(args); // Clean up the variadic arguments
}
// Callback function to display query results (optional).
// When using async SQLite interface, callback is called on the async SQLite
// internal thread of control.
static int callback(void* NotUsed, int argc, char** argv, char** azColName)
{
for (int i = 0; i < argc; i++)
{
printf_safe("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
}
return 0;
}
// Simple example to create and write to the database asynchronously.
// Use async::sqlite3_<func> series of functions within the async namespace.
int async_sqlite_simple_example()
{
sqlite3* db;
char* errMsg = 0;
int rc;
// Step 1: Open (or create) the SQLite database file
rc = async::sqlite3_open("async_sqlite_simple_example.db", &db);
if (rc)
{
fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
return(0);
}
else
{
printf_safe("Opened database successfully\n");
}
// Step 2: Create a table if it does not exist
const char* createTableSQL = "CREATE TABLE IF NOT EXISTS people ("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"first_name TEXT NOT NULL, "
"last_name TEXT NOT NULL);";
rc = async::sqlite3_exec(db, createTableSQL, callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
return 1;
}
else
{
printf_safe("Table created successfully or already exists\n");
}
// Step 3: Insert a record
const char* insertSQL = "INSERT INTO people (first_name, last_name) "
"VALUES ('John', 'Doe');";
rc = async::sqlite3_exec(db, insertSQL, callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
}
else
{
printf_safe("Record inserted successfully\n");
}
// Step 4: Verify the insertion by querying the table
const char* selectSQL = "SELECT * FROM people;";
rc = async::sqlite3_exec(db, selectSQL, callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
}
else
{
printf_safe("Query executed successfully\n");
}
// Step 5: Close the database connection
async::sqlite3_close(db);
return 0;
}
// Async SQLite multithread example. The WriteDatabaseLambda() function is called
// from multiple threads of control. Function returns after all threads are complete.
int async_mutithread_example()
{
char* errMsg = 0;
int rc;
// Step 1: Open (or create) the SQLite database file
rc = async::sqlite3_open("async_multithread_example.db", &db_multithread);
if (rc)
{
fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db_multithread));
return(0);
}
else
{
printf_safe("Opened database successfully\n");
}
// Step 2: Create a table if it does not exist
const char* createTableSQL = "CREATE TABLE IF NOT EXISTS threads ("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"thread_name TEXT NOT NULL, "
"cnt TEXT NOT NULL);";
rc = async::sqlite3_exec(db_multithread, createTableSQL, callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
return 1;
}
else
{
printf_safe("Table created successfully or already exists\n");
}
// Lambda function to write data to SQLite database
auto WriteDatabaseLambda = +[](std::string thread_name) -> void
{
char* errMsg = 0;
int rc;
static int cnt = 0;
for (int i = 0; i < 100; i++)
{
// Step 3: Insert a record
std::string insertSQL = "INSERT INTO threads (thread_name, cnt) "
"VALUES ('" + thread_name + "', '" + std::to_string(i) + "');";
rc = async::sqlite3_exec(db_multithread, insertSQL.c_str(), callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
}
else
{
printf_safe("Record inserted successfully\n");
}
}
// Step 4: Verify the insertion by querying the table
const char* selectSQL = "SELECT * FROM threads;";
rc = async::sqlite3_exec(db_multithread, selectSQL, callback, 0, &errMsg);
if (rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", errMsg);
sqlite3_free(errMsg);
}
else
{
printf_safe("Query executed successfully\n");
}
// Last thread complete?
if (++cnt >= WORKER_THREAD_CNT)
{
std::lock_guard<std::mutex> lock(mtx); // Lock the mutex to modify shared state
ready = true; // Set the shared condition to true, meaning threads are complete
cv.notify_all(); // Notify waiting threads time to exit
}
}; // End Lambda
// Invoke WriteDatabaseLambda lambda function on worker thread
ready = false;
for (int i = 0; i < WORKER_THREAD_CNT; i++)
{
// Create an async delegate to invoke WriteDatabaseLambda()
auto delegate = MakeDelegate(WriteDatabaseLambda, workerThreads[i]);
// Invoke async target function WriteDatabaseLambda() on workerThread[i] non-blocking
// i.e. invoke the target function and don't wait for the return.
delegate(workerThreads[i].GetThreadName());
}
// Lock the mutex and wait for the signal from WriteDatabaseLambda
std::unique_lock<std::mutex> lock(mtx);
// Wait for all WriteDatabaseLambda worker threads to complete
while (!ready)
cv.wait(lock); // Block the thread until notified
// Step 5: Close the database connection
async::sqlite3_close(db_multithread);
// Invoke delegate callback indicating function is compelete
completeCallback(0);
return 0;
}
// Run simple async example
void example1()
{
async_sqlite_simple_example();
}
// Run simple async example entirely on the internal async SQLite thread. This shows how
// to execute multiple SQL commands uninterrupted.
void example2()
{
// Get the internal SQLite async interface thread
Thread* sqlThread = async::sqlite3_get_thread();
// Create an asynchronous blocking delegate to invoke async_sqlite_simple_example()
auto delegate = MakeDelegate(&async_sqlite_simple_example, *sqlThread, async::MAX_WAIT);
// Invoke async_sqlite_simple_example() on sqlThread and wait for the retVal
auto retVal = delegate.AsyncInvoke();
// If retVal has a value then the asynchronous function call succeeded
if (retVal.has_value())
{
// The async_sqlite_simple_example() return value is stored in retVal.value()
printf_safe("Return Value: %d\n", retVal.value());
}
}
// Run multithreaded example (blocking) and return the execution time
std::chrono::microseconds example3()
{
auto blockingStart = std::chrono::high_resolution_clock::now();
// Call example and wait for completion
int retVal = async_mutithread_example();
auto blockingEnd = std::chrono::high_resolution_clock::now();
auto blockingDuration = std::chrono::duration_cast<std::chrono::microseconds>(blockingEnd - blockingStart);
return blockingDuration;
}
// Run the multithreaded example (non-blocking) on nonBlockingAsyncThread thread and
// return the execution time.
std::chrono::microseconds example4()
{
completeFlag = false;
// Lambda function to receive the complete callback
auto CompleteCallbackLambda = +[](int retVal) -> void
{
completeFlag = true;
};
// Register with delegate to receive callback
completeCallback += MakeDelegate(CompleteCallbackLambda);
auto nonBlockingStart = std::chrono::high_resolution_clock::now();
// Create a delegate to execute on nonBlockingAsyncThread without waiting for completion
auto noWaitDelegate = MakeDelegate(&async_mutithread_example, nonBlockingAsyncThread);
// Call async_mutithread_example() on nonBlockingAsyncThread and don't wait for it to complete
noWaitDelegate.AsyncInvoke();
auto nonBlockingEnd = std::chrono::high_resolution_clock::now();
auto nonBlockingDuration = std::chrono::duration_cast<std::chrono::microseconds>(nonBlockingEnd - nonBlockingStart);
return nonBlockingDuration;
}
// Example future: Using std::future for concurrent database operations
// This demonstrates the "Fire-and-Forget" pattern where the main thread
// remains responsive while a heavy SQL operation runs in the background.
void example_future()
{
printf_safe("\n--- Starting Example 5 (Future/Async) ---\n");
sqlite3* db = nullptr;
// Open DB synchronously to ensure valid handle
async::sqlite3_open("async_future_example.db", &db);
// Create a table synchronously (we need it before inserting)
async::sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS heavy_data (id INT, val TEXT);", nullptr, nullptr, nullptr);
// 1. Launch a heavy insert operation asynchronously
// This returns immediately, giving us a std::future
printf_safe("[Main] Launching heavy async insert...\n");
// Note: The SQL string must remain valid until the future completes.
// Ideally use string literals or manage lifetime carefully.
std::string sql = "INSERT INTO heavy_data VALUES (123, 'Concurrent Data');";
// Pass 5 arguments matching the raw API signature
auto future = async::sqlite3_exec_future(db, sql.c_str(), nullptr, nullptr, nullptr);
// 2. Perform other work on the main thread while DB is busy
// In a real app, this would be UI updates or input processing.
printf_safe("[Main] DB is busy. Performing other tasks on main thread...\n");
for (int i = 0; i < 3; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
printf_safe("[Main] Working... %d%%\n", (i + 1) * 33);
}
// 3. Wait for the database operation to complete and check the result
printf_safe("[Main] Waiting for DB to finish...\n");
// .get() blocks here ONLY if the DB task is still running.
int rc = future.get();
if (rc == SQLITE_OK) {
printf_safe("[Main] Async insert completed successfully!\n");
}
else {
printf_safe("[Main] Async insert failed with code: %d\n", rc);
}
async::sqlite3_close(db);
std::remove("async_future_example.db");
}
//------------------------------------------------------------------------------
// main
//------------------------------------------------------------------------------
int main(void)
{
// Start the thread that will run ProcessTimers
std::thread timerThread(ProcessTimers);
std::remove("async_multithread_example.db");
std::remove("async_sqlite_simple_example.db");
// Create all worker threads
nonBlockingAsyncThread.CreateThread();
for (int i=0; i<WORKER_THREAD_CNT; i++)
workerThreads[i].CreateThread();
// Initialize async sqlite3 interface
async::sqlite3_init_async();
// Run all examples
example1();
example2();
auto blockingDuration = example3();
auto nonBlockingDuration = example4();
example_future();
// Wait for example4() to complete on nonBlockingAsyncThread
while (!completeFlag)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Exit all worker threads
nonBlockingAsyncThread.ExitThread();
for (int i = 0; i < WORKER_THREAD_CNT; i++)
workerThreads[i].ExitThread();
// Compare blocking and non-blocking execution times
std::cout << "Blocking Time: " << blockingDuration.count() << " microseconds." << std::endl;
std::cout << "Non-Blocking Time: " << nonBlockingDuration.count() << " microseconds." << std::endl;
// Run unit tests
RunUnitTests();
// Ensure the timer thread completes before main exits
processTimerExit.store(true);
if (timerThread.joinable())
timerThread.join();
return 0;
}