Skip to content

Stream.asyncPush hangs when using bounded dropping/sliding buffers with synchronous emission #6232

@ryanleecode

Description

@ryanleecode

What version of Effect is running?

3.21.2 (vendored at repos/effect)

What steps can reproduce the bug?

import { Effect, Stream } from 'effect'
import { Observable } from 'rxjs'

// Synchronous observable that emits 1 value then completes
const obs = new Observable<number>((subscriber) => {
  subscriber.next(42)
  subscriber.complete()
})

// Wraps the observable via asyncPush with a bounded dropping buffer
const stream = Stream.asyncPush<number, Error>((emit) =>
  Effect.acquireRelease(
    Effect.sync(() =>
      obs.subscribe({
        next: (value) => emit.single(value),
        error: (error) => emit.fail(new Error(String(error))),
        complete: () => emit.end(),
      })
    ),
    (sub) => Effect.sync(() => sub.unsubscribe()),
  ), { bufferSize: 1, strategy: 'dropping' })

await Effect.runPromise(Stream.runCollect(stream))
// HANGS FOREVER — never resolves

The same happens with { bufferSize: 1, strategy: "sliding" }.

When values are emitted asynchronously (e.g. via setTimeout), or when bufferSize >= number of synchronously emitted values + 1, the stream completes normally.

What is the expected behavior?

Stream.runCollect(stream) should return [42] — the stream should emit the synchronously emitted value and then complete normally when the observable calls complete().

What do you see instead?

The Effect hangs forever. The consumer channel is permanently blocked on Queue.take(queue), waiting for a termination signal (Exit.void) that was silently dropped.

Additional information

Root cause

The bug is in makePush (packages/effect/src/internal/stream/emit.ts:51-123). The end() and done() methods follow the same pattern:

// emit.ts lines 104-108
end() {
  if (finished) return
  finished = true
  flush()                          // (A) offers accumulated values as 1 queue item
  queue.unsafeOffer(Exit.void)     // (B) offers the termination signal
}

flush() (line 73-78) offers the accumulated buffer to the queue via queue.unsafeOffer(buffer), consuming one queue slot. Then end() offers Exit.void as a second queue slot.

The queue is created by queueFromBufferOptionsPush (stream.ts:595-610):

const queueFromBufferOptionsPush = <A, E>(options?: ...) => {
  // ... unbounded case ...
  switch (options?.strategy) {
    case "sliding":
      return Queue.sliding(options.bufferSize ?? 16)   // capacity = bufferSize
    default:
      return Queue.dropping(options?.bufferSize ?? 16)  // capacity = bufferSize
  }
}

The bounded queue has capacity equal to bufferSize. When bufferSize === 1:

  1. flush() at step (A) offers an array of values → queue fills the only slot (capacity 1, now full)
  2. queue.unsafeOffer(Exit.void) at step (B) tries to offer the termination signal → the backing MutableQueue.bounded(1) is full, offer returns false → the exit signal is silently dropped

The consumer loop in asyncPush (stream.ts:630-633) then blocks forever:

// stream.ts lines 630-633
const loop = core.flatMap(Queue.take(queue), (item) =>
  Exit.isExit(item)
    ? Exit.isSuccess(item) ? core.void : core.failCause(item.cause)
    : channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop))

It takes the value array from the queue, writes it to the channel, loops, and calls Queue.take(queue) again — but the queue is empty and no Exit.void will ever arrive because it was dropped.

Why bufferSize >= values.length + 1 works

With bufferSize: 2, the queue has capacity 2. flush() takes slot 1, and unsafeOffer(Exit.void) takes slot 2 — both fit. The stream terminates.

Why async emission works

When values are emitted asynchronously (e.g. via setTimeout), the scheduled flush task runs between emissions. The consumer concurrently drains items from the queue, so the queue never fills completely. When end() eventually runs, there's always room for Exit.void.

This also affects done() (error path)

// emit.ts lines 80-88
function done(exit: Exit.Exit<A, E>) {
  if (finished) return
  finished = true
  if (exit._tag === 'Success') {
    buffer.push(exit.value)
  }
  flush()
  queue.unsafeOffer(exit._tag === 'Success' ? Exit.void : exit)
}

The same race exists for error/defect terminations on bounded queues with bufferSize: 1.

Impact

Any consumer that wraps a synchronous source (e.g., converting an RxJS Observable or a synchronous push iterator into a Stream) and uses a bounded dropping/sliding buffer will hang if the source emits synchronously and then completes within the register callback.

Proposed fix

queueFromBufferOptionsPush should create bounded queues with bufferSize + 1 capacity to reserve one slot for the termination signal:

 switch (options?.strategy) {
   case "sliding":
-    return Queue.sliding(options.bufferSize ?? 16)
+    return Queue.sliding((options.bufferSize ?? 16) + 1)
   default:
-    return Queue.dropping(options?.bufferSize ?? 16)
+    return Queue.dropping((options?.bufferSize ?? 16) + 1)
 }

The +1 slot is strictly for internal use by the emit machinery (the termination signal) and does not change the user-visible buffering behavior.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions