From 76e5e85977b490eb0eda89a4431b7bbfe53b2957 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 16:40:34 -0600 Subject: [PATCH 01/14] update ignores --- .gitignore | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index dc17cc5..f4fe1e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ nimcache/ nimblecache/ htmldocs/ -tests/tsmartptrsleak -tests/tchannels_simple +* +!*/ +!*.* +*.dSYM/ \ No newline at end of file From aae76019a19430279d848967d529a6cdbbd28d4a Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 16:54:43 -0600 Subject: [PATCH 02/14] add non-blocking ring test --- tests/tchannels_ring.nim | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tests/tchannels_ring.nim diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim new file mode 100644 index 0000000..963e121 --- /dev/null +++ b/tests/tchannels_ring.nim @@ -0,0 +1,27 @@ +import threading/channels +import std/unittest + +const + BufferSize = 3 + +suite "Ring Buffer Channel Tests": + test "Non-blocking ring buffer behavior": + var chan = newChan[int](BufferSize, overwrite = true) + + # Fill the buffer + for i in 0.. Date: Tue, 25 Mar 2025 17:18:45 -0600 Subject: [PATCH 03/14] add more tests --- tests/tchannels_ring.nim | 45 +++++++++++++++++++++++++++++++++++++++- threading/channels.nim | 12 ++++++----- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 963e121..3442590 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -23,5 +23,48 @@ suite "Ring Buffer Channel Tests": values.add(x) # Verify we got the most recent values - check values == @[1, 2, BufferSize] + check values == @[BufferSize, 1, 2] + + test "Non-blocking ring buffer behavior with 2 elements": + var chan = newChan[int](BufferSize, overwrite = true) + + # Fill the buffer + for i in 0.. Date: Tue, 25 Mar 2025 17:29:20 -0600 Subject: [PATCH 04/14] add more tests --- tests/tchannels_ring.nim | 68 ++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 45 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 3442590..f22a248 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -9,12 +9,9 @@ suite "Ring Buffer Channel Tests": var chan = newChan[int](BufferSize, overwrite = true) # Fill the buffer - for i in 0.. Date: Tue, 25 Mar 2025 17:51:26 -0600 Subject: [PATCH 05/14] add more tests - fix read indexing --- tests/tchannels_ring.nim | 24 ++++-------------------- threading/channels.nim | 29 ++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index f22a248..4959d1e 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -6,25 +6,9 @@ const suite "Ring Buffer Channel Tests": test "Non-blocking ring buffer behavior": - var chan = newChan[int](BufferSize, overwrite = true) - - # Fill the buffer - for i in 0.. Date: Tue, 25 Mar 2025 17:52:34 -0600 Subject: [PATCH 06/14] add more tests - fix read indexing --- tests/tchannels_ring.nim | 5 ++++- threading/channels.nim | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 4959d1e..f5173f5 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -8,7 +8,6 @@ suite "Ring Buffer Channel Tests": test "Non-blocking ring buffer behavior": proc fillBuffer(n: int): seq[int] = - echo "\nfillBuffer ", n var chan = newChan[int](BufferSize, overwrite = true) # Fill the buffer for i in 0.. Date: Tue, 25 Mar 2025 18:11:55 -0600 Subject: [PATCH 07/14] add more tests - change overwrite to use send --- tests/tchannels_ring.nim | 2 +- threading/channels.nim | 26 ++++++++++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index f5173f5..01088fd 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -11,7 +11,7 @@ suite "Ring Buffer Channel Tests": var chan = newChan[int](BufferSize, overwrite = true) # Fill the buffer for i in 0..= 2 + block example_non_blocking_overwrite: + var chan = newChan[string](elements = 1, overwrite = true) + discard chan.send("Hello") + discard chan.send("World") + var msg = "" + assert chan.tryRecv(msg) + assert msg == "World" + when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)): {.error: "This module requires one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags".} @@ -197,19 +208,18 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo # check for when another thread was faster to fill when blocking: - while chan.isFull(): - wait(chan.spaceAvailableCV, chan.lock) - else: if chan.isFull(): if chan.overwrite: incrementReadIndex(chan) else: - release(chan.lock) - return false + while chan.isFull(): + wait(chan.spaceAvailableCV, chan.lock) + else: + if chan.isFull(): + release(chan.lock) + return false - let prevHead = chan.getHead() - let prevTail = chan.getTail() - assert not chan.isFull() or chan.overwrite + assert not chan.isFull() let writeIdx = if chan.getHead() < chan.slots: From f933d65f6bddd5e471ac07a5933fa09c9cf64c03 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 18:13:57 -0600 Subject: [PATCH 08/14] add more tests - change overwrite to use send --- tests/tchannels_ring.nim | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 01088fd..9766cc3 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -33,3 +33,11 @@ suite "Ring Buffer Channel Tests": check fillBuffer(6) == @[6, 7, 8] check fillBuffer(7) == @[7, 8, 9] check fillBuffer(8) == @[8, 9, 10] + + test "Non-blocking ring buffer behavior with size 1": + var chan = newChan[int](1, overwrite = true) + chan.send(1) + chan.send(2) + var x: int + check chan.tryRecv(x) + check x == 2 From 4b330657b562b4b231e68c9a530a2ec82d1dd701 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 18:20:57 -0600 Subject: [PATCH 09/14] add more tests - change overwrite to use send --- threading/channels.nim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 5b92693..bdbea78 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -101,11 +101,11 @@ runnableExamples("--threads:on --gc:orc"): assert messages.len >= 2 block example_non_blocking_overwrite: - var chan = newChan[string](elements = 1, overwrite = true) - discard chan.send("Hello") - discard chan.send("World") + var chanRingBuffer = newChan[string](elements = 1, overwrite = true) + chanRingBuffer.send("Hello") + chanRingBuffer.send("World") var msg = "" - assert chan.tryRecv(msg) + assert chanRingBuffer.tryRecv(msg) assert msg == "World" when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)): From 595865d86e06888f2f33460ef10b3268d901fad9 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 18:33:21 -0600 Subject: [PATCH 10/14] add more tests - change overwrite to use send --- tests/tchannels_ring.nim | 10 +++++----- threading/channels.nim | 38 ++++++++++++++++++++------------------ 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 9766cc3..3eede79 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -8,10 +8,10 @@ suite "Ring Buffer Channel Tests": test "Non-blocking ring buffer behavior": proc fillBuffer(n: int): seq[int] = - var chan = newChan[int](BufferSize, overwrite = true) + var chan = newChan[int](BufferSize) # Fill the buffer for i in 0..= 2 block example_non_blocking_overwrite: - var chanRingBuffer = newChan[string](elements = 1, overwrite = true) + var chanRingBuffer = newChan[string](elements = 1) chanRingBuffer.send("Hello") - chanRingBuffer.send("World") + chanRingBuffer.send("World", overwrite = true) var msg = "" assert chanRingBuffer.tryRecv(msg) assert msg == "World" @@ -121,7 +121,6 @@ type ChannelObj = object lock: Lock spaceAvailableCV, dataAvailableCV: Cond - overwrite: bool slots: int ## Number of item slots in the buffer head: Atomic[int] ## Write/enqueue/send index tail: Atomic[int] ## Read/dequeue/receive index @@ -192,25 +191,30 @@ proc freeChannel(chan: ChannelRaw) = # MPMC Channels (Multi-Producer Multi-Consumer) # ------------------------------------------------------------------------------ -template incrementReadIndex(chan: ChannelRaw) = +template incrWriteIndex(chan: ChannelRaw) = + atomicInc(chan.head) + if chan.getHead() == 2 * chan.slots: + chan.setHead(0) + +template incrReadIndex(chan: ChannelRaw) = atomicInc(chan.tail) if chan.getTail() == 2 * chan.slots: chan.setTail(0) -proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool): bool = +proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, overwrite: bool): bool = assert not chan.isNil assert not data.isNil when not blocking: - if chan.isFull() and not chan.overwrite: return false + if chan.isFull() and not overwrite: return false acquire(chan.lock) # check for when another thread was faster to fill when blocking: if chan.isFull(): - if chan.overwrite: - incrementReadIndex(chan) + if overwrite: + incrReadIndex(chan) else: while chan.isFull(): wait(chan.spaceAvailableCV, chan.lock) @@ -228,9 +232,8 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo chan.getHead() - chan.slots copyMem(chan.buffer[writeIdx * size].addr, data, size) - atomicInc(chan.head) - if chan.getHead() == 2 * chan.slots: - chan.setHead(0) + + incrWriteIndex(chan) signal(chan.dataAvailableCV) release(chan.lock) @@ -264,7 +267,7 @@ proc channelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static copyMem(data, chan.buffer[readIdx * size].addr, size) - incrementReadIndex(chan) + incrReadIndex(chan) signal(chan.spaceAvailableCV) release(chan.lock) @@ -364,7 +367,7 @@ proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} = ## Returns `false` and does not change `dist` if no message was received. channelReceive(c.d, dst.addr, sizeof(T), false) -proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = +proc send*[T](c: Chan[T], src: sink Isolated[T], overwrite = false) {.inline.} = ## Sends the message `src` to the channel `c`. ## This blocks the sending thread until `src` was successfully sent. ## @@ -374,13 +377,13 @@ proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = ## messages from the channel are removed. when defined(gcOrc) and defined(nimSafeOrcSend): GC_runOrc() - discard channelSend(c.d, src.addr, sizeof(T), true) + discard channelSend(c.d, src.addr, sizeof(T), true, overwrite) wasMoved(src) -template send*[T](c: Chan[T]; src: T) = +template send*[T](c: Chan[T]; src: T, overwrite = false) = ## Helper template for `send`. mixin isolate - send(c, isolate(src)) + send(c, isolate(src), overwrite) proc recv*[T](c: Chan[T], dst: var T) {.inline.} = ## Receives a message from the channel `c` and fill `dst` with its value. @@ -405,11 +408,10 @@ proc peek*[T](c: Chan[T]): int {.inline.} = ## Returns an estimation of the current number of messages held by the channel. numItems(c.d) -proc newChan*[T](elements: Positive = 30, overwrite = false): Chan[T] = +proc newChan*[T](elements: Positive = 30): Chan[T] = ## An initialization procedure, necessary for acquiring resources and ## initializing internal state of the channel. ## ## `elements` is the capacity of the channel and thus how many messages it can hold ## before it refuses to accept any further messages. result = Chan[T](d: allocChannel(sizeof(T), elements)) - result.d.overwrite = overwrite From 1d526250c387a3c3ed7eb96567b0867f8c077c32 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 18:34:11 -0600 Subject: [PATCH 11/14] add more tests - change overwrite to use send --- threading/channels.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 93befd6..0b2bdd6 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -31,8 +31,8 @@ ## procs. Send operations add messages to the channel, receiving operations ## remove them. ## -## Overwrite enables a ringbuffer mode where `send` overwrites the oldest message -## if the channel is full. +## Normally, the `send` proc will block if the channel is full. If the `overwrite` +## parameter is set to `true`, the oldest message will be overwritten instead of blocking. ## ## See also: ## * [std/isolation](https://nim-lang.org/docs/isolation.html) From da8d50364c3e4230cf937255a8aa963c5ac3320b Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 18:51:25 -0600 Subject: [PATCH 12/14] add more tests - fix compile for trySend --- threading/channels.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 0b2bdd6..a004b8f 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -322,7 +322,7 @@ proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} = ## ## Returns `false` if the message was not sent because the number of pending ## messages in the channel exceeded its capacity. - result = channelSend(c.d, src.addr, sizeof(T), false) + result = channelSend(c.d, src.addr, sizeof(T), false, false) if result: wasMoved(src) @@ -350,7 +350,7 @@ proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} = ## ## Returns `false` if the message was not sent because the number of pending ## messages in the channel exceeded its capacity. - result = channelSend(c.d, src.addr, sizeof(T), false) + result = channelSend(c.d, src.addr, sizeof(T), false, false) if result: wasMoved(src) From 3b4e858cfcfd84298ed98f08c2b2b836e9aa7738 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 25 Mar 2025 23:26:47 -0600 Subject: [PATCH 13/14] make into new proc --- tests/tchannels_ring.nim | 6 +++--- threading/channels.nim | 37 +++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/tests/tchannels_ring.nim b/tests/tchannels_ring.nim index 3eede79..9842b09 100644 --- a/tests/tchannels_ring.nim +++ b/tests/tchannels_ring.nim @@ -11,7 +11,7 @@ suite "Ring Buffer Channel Tests": var chan = newChan[int](BufferSize) # Fill the buffer for i in 0.. Date: Tue, 25 Mar 2025 23:31:50 -0600 Subject: [PATCH 14/14] make into new proc --- threading/channels.nim | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 3272d3a..60f054d 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -385,13 +385,10 @@ template send*[T](c: Chan[T]; src: T) = send(c, isolate(src)) proc push*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = - ## Sends the message `src` to the channel `c`. - ## This blocks the sending thread until `src` was successfully sent. + ## Pushes the message `src` to the channel `c`. + ## This is a non-blocking operation that overwrites the oldest message if the channel is full. ## ## The memory of `src` is moved, not copied. - ## - ## If the channel is already full with messages this will block the thread until - ## messages from the channel are removed. when defined(gcOrc) and defined(nimSafeOrcSend): GC_runOrc() discard channelSend(c.d, src.addr, sizeof(T), true, overwrite=true)