Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions crates/node/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,10 +1310,12 @@ pub fn with_scope(

let scope_stack = current_scope_stack_handle();
let scope_uuid = scope_handle.inner.uuid;
let scope_name = scope_handle.inner.name.clone();
let scope_type_int: u32 = ScopeType::from(scope_handle.inner.scope_type) as u32;
let scope_attrs = scope_handle.inner.attributes.bits();
let scope_parent_uuid = scope_handle.inner.parent_uuid.map(|u| u.to_string());
// Hand the callback a real `ScopeHandle` instance, matching the Rust,
// Python, and WebAssembly bindings, so it can be passed back into `event`,
// `toolCallExecute`, and `llmCallExecute`. The instance is materialized on
// the JS thread because a `napi_wrap`'d handle cannot cross the
// threadsafe-function boundary as plain JSON.
let callback_handle = scope_handle.inner.clone();

// Create a promise-aware wrapper so we handle both sync and async callbacks.
let pa_fn = std::sync::Arc::new(
Expand All @@ -1332,15 +1334,18 @@ pub fn with_scope(
async move {
TASK_SCOPE_STACK
.scope(scope_stack, async move {
let handle_json = serde_json::json!({
"uuid": scope_uuid.to_string(),
"name": scope_name,
"scopeType": scope_type_int,
"attributes": scope_attrs,
"parentUuid": scope_parent_uuid,
});

let result = pa_fn.call(handle_json).await;
let build_handle: crate::promise_call::Arg0Builder =
Box::new(move |env: &Env| {
let raw = unsafe {
<ScopeHandle as ToNapiValue>::to_napi_value(
env.raw(),
ScopeHandle::from(callback_handle),
)?
};
Ok(unsafe { JsUnknown::from_raw_unchecked(env.raw(), raw) })
});

let result = pa_fn.call_with_arg0(build_handle).await;
// Always pop the scope, even on error.
if core_scope_api::pop_scope(
core_scope_api::PopScopeParams::builder()
Expand Down
51 changes: 39 additions & 12 deletions crates/node/src/promise_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,24 @@ enum NextFn {
Stream(JsonStreamNextFn),
}

/// Builds the first JS callback argument on the Node main thread.
///
/// Some callback arguments, such as `#[napi]` class instances, cannot cross the
/// threadsafe-function boundary as plain JSON. This builder runs inside the
/// threadsafe-function call (on the JS thread), so it can materialize those
/// values directly instead of serializing them.
pub type Arg0Builder = Box<dyn FnOnce(&Env) -> napi::Result<JsUnknown> + Send>;

/// The first argument passed to the wrapped JS callback.
enum PrimaryArg {
/// A plain JSON value converted on the JS thread.
Json(Json),
/// A value materialized on the JS thread by a builder closure.
Build(Arg0Builder),
}

struct CallArgs {
args: Json,
arg0: PrimaryArg,
next: Option<NextFn>,
completion: CallCompletion,
}
Expand Down Expand Up @@ -197,13 +213,12 @@ impl PromiseAwareFn {
None => undefined_to_unknown(&ctx.env)?,
};
let (resolve, reject) = build_completion_unknowns(&ctx.env, ctx.value.completion)?;
let arg0 = match ctx.value.arg0 {
PrimaryArg::Json(value) => json_to_unknown(&ctx.env, value)?,
PrimaryArg::Build(build) => build(&ctx.env)?,
};

let args = vec![
json_to_unknown(&ctx.env, ctx.value.args)?,
next,
resolve,
reject,
];
let args = vec![arg0, next, resolve, reject];
Ok(args)
})?;

Expand All @@ -217,13 +232,24 @@ impl PromiseAwareFn {

/// Call the JS function with the given args and await the result.
pub async fn call(&self, args: Json) -> FlowResult<Json> {
self.call_inner(args, None).await
self.call_inner(PrimaryArg::Json(args), None).await
}

/// Call the JS function with a builder-constructed first argument and await
/// the result.
///
/// The builder runs on the Node main thread, so it can construct values that
/// cannot cross the threadsafe-function boundary as plain JSON, such as a
/// `#[napi]` class instance.
pub async fn call_with_arg0(&self, build_arg0: Arg0Builder) -> FlowResult<Json> {
self.call_inner(PrimaryArg::Build(build_arg0), None).await
}

/// Call the JS function with a middleware-style `next(arg)` callback that
/// resolves to a JSON result.
pub async fn call_with_json_next(&self, args: Json, next: JsonNextFn) -> FlowResult<Json> {
self.call_inner(args, Some(NextFn::Json(next))).await
self.call_inner(PrimaryArg::Json(args), Some(NextFn::Json(next)))
.await
}

/// Call the JS function with a middleware-style `next(arg)` callback that
Expand All @@ -233,7 +259,8 @@ impl PromiseAwareFn {
args: Json,
next: JsonStreamNextFn,
) -> FlowResult<Json> {
self.call_inner(args, Some(NextFn::Stream(next))).await
self.call_inner(PrimaryArg::Json(args), Some(NextFn::Stream(next)))
.await
}

/// Release the underlying threadsafe function so it does not outlive its registration.
Expand All @@ -243,7 +270,7 @@ impl PromiseAwareFn {
}
}

async fn call_inner(&self, args: Json, next: Option<NextFn>) -> FlowResult<Json> {
async fn call_inner(&self, arg0: PrimaryArg, next: Option<NextFn>) -> FlowResult<Json> {
let (sender, receiver) = tokio::sync::oneshot::channel();
let tsfn = self
.tsfn
Expand All @@ -254,7 +281,7 @@ impl PromiseAwareFn {
.ok_or_else(closed_tsfn_error)?;
let status = tsfn.call(
Ok(CallArgs {
args,
arg0,
next,
completion: CallCompletion::new(sender),
}),
Expand Down
42 changes: 42 additions & 0 deletions crates/node/tests/scope_tests.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const {
popScope,
event,
withScope,
toolCallExecute,
llmCallExecute,
registerSubscriber,
deregisterSubscriber,
flushSubscribers,
Expand Down Expand Up @@ -118,6 +120,46 @@ describe('withScope', () => {
assert.equal(after.uuid, before.uuid, 'scope should be popped after withScope');
});

it('callback receives a reusable ScopeHandle', async () => {
let toolResult;
let llmResult;
let childParentUuid;
await withScope('reusable_handle', ScopeType.Agent, async (handle) => {
// The handle is a real ScopeHandle: usable as an event target,
const handleUuid = handle.uuid;
event('inside', handle, { ok: true }, null);

// as an explicit parent for child scopes,
const child = pushScope('child', ScopeType.Function, handle, null);
childParentUuid = child.parentUuid;
popScope(child);

// and as the scope target for managed tool/LLM execution.
toolResult = await toolCallExecute(
'search',
{ query: 'hello' },
(args) => ({ echo: args.query }),
handle,
null,
null,
null,
);
llmResult = await llmCallExecute(
'demo-provider',
{ headers: {}, content: { messages: [{ role: 'user', content: 'hi' }] } },
(request) => ({ ok: true, messages: request.content.messages }),
handle,
null,
null,
null,
null,
);
assert.equal(childParentUuid, handleUuid, 'child scope should record the handle as its parent');
});
assert.deepEqual(toolResult, { echo: 'hello' });
assert.deepEqual(llmResult, { ok: true, messages: [{ role: 'user', content: 'hi' }] });
});

it('returns callback result', async () => {
const result = await withScope('result_test', ScopeType.Function, () => {
return {
Expand Down
Loading