Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/observer-authority-descriptor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"@agentruntimecontrolprotocol/runtime": minor
"@agentruntimecontrolprotocol/client": minor
---

Surface a job's authority descriptor to subscribers (§7.6).

The runtime now populates `budget` (current per-currency counters) on
`job.subscribed`, alongside the `lease_constraints` it already sent, so an
observing principal can render a job's authority surface — the expiry clock
and budget gauge — without being the job's submitter. The cap is derivable
from the lease's `cost.budget` pattern; subsequent `cost.budget.remaining`
metric events keep the gauge live.

The client's `JobSubscription` now exposes the full descriptor:
`currentStatus`, `agent`, `lease`, `leaseConstraints`, `budget`, and
(submitter-only) `credentials`. Credentials remain redacted for non-submitters
per §14. Backward-compatible — only additive fields.
6 changes: 6 additions & 0 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ export class ARCPClient {
const ack = await deferred.promise;
return {
jobId,
currentStatus: ack.current_status,
agent: ack.agent,
lease: ack.lease,
leaseConstraints: ack.lease_constraints,
budget: ack.budget,
credentials: ack.credentials,
subscribedFrom: ack.subscribed_from,
replayed: ack.replayed,
unsubscribe: () => this.unsubscribe(jobId, sessionId),
Expand Down
21 changes: 21 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
ClientIdentity,
Envelope,
JobResultPayload,
JobStateName,
Lease,
LeaseConstraints,
Credential,
Expand Down Expand Up @@ -125,6 +126,26 @@ export interface JobHandle {
*/
export interface JobSubscription {
readonly jobId: JobId;
/** Job status at subscription time (§7.6). */
readonly currentStatus: JobStateName;
/** Resolved `name@version` (or bare name) the runtime is running. */
readonly agent: string;
/** Effective capability grants (§9.1). */
readonly lease: Lease;
/** v1.1 §9.5 — echoed lease constraints (currently `expires_at`), if any. */
readonly leaseConstraints: LeaseConstraints | undefined;
/**
* v1.1 §9.6 — current per-currency budget counters at subscription time,
* present when `cost.budget` is in the lease. The cap is derivable from the
* lease's `cost.budget` pattern; subsequent `cost.budget.remaining` metric
* events keep this live.
*/
readonly budget: Readonly<Record<string, number>> | undefined;
/**
* v1.1 §9.8 — provisioned credentials, present ONLY when this subscriber is
* the job's original submitter. Redacted for all other observers (§14).
*/
readonly credentials: readonly Credential[] | undefined;
readonly subscribedFrom: number;
readonly replayed: boolean;
unsubscribe(): Promise<void>;
Expand Down
11 changes: 11 additions & 0 deletions packages/runtime/src/server-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ function buildSubscribedPayload(args: {
subscriberPrincipal !== undefined &&
job.submitterPrincipal === subscriberPrincipal;

// §9.6 — current per-currency budget counters, so an observer joining
// mid-job sees the live remaining (the cap is derivable from the lease's
// `cost.budget` pattern). Empty-map guard avoids an empty `budget: {}`.
const budget: Record<string, number> = {};
let hasBudget = false;
for (const [currency, remaining] of job.budget.entries()) {
budget[currency] = remaining;
hasBudget = true;
}

return {
job_id: job.jobId,
current_status: job.state,
Expand All @@ -227,6 +237,7 @@ function buildSubscribedPayload(args: {
...(job.leaseConstraints === undefined
? {}
: { lease_constraints: job.leaseConstraints }),
...(hasBudget ? { budget } : {}),
parent_job_id: job.parentJobId ?? null,
...(job.traceId === undefined ? {} : { trace_id: job.traceId }),
subscribed_from: subscribedFrom,
Expand Down
98 changes: 98 additions & 0 deletions packages/runtime/test/provisioned-credentials.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -662,4 +662,102 @@ describe("credential confidentiality in job.subscribed (§14)", () => {
await bobClient.close();
await server.close();
});

it("observer receives budget + lease_constraints on job.subscribed (§7.6)", async () => {
const server = new ARCPServer({
runtime: TEST_RUNTIME,
capabilities: TEST_CAPABILITIES,
bearer: new StaticBearerVerifier(
new Map([
["tok-alice", { principal: "alice" }],
["tok-bob", { principal: "bob" }],
]),
),
// Cross-principal subscription so bob can observe alice's job.
jobAuthorizationPolicy: () => true,
logger: silentLogger,
});
server.registerAgent("slow-noop", async () => {
await new Promise((r) => setTimeout(r, 200));
return null;
});

const [aliceClient, aliceServerSide] = pairMemoryTransports();
const [bobClient, bobServerSide] = pairMemoryTransports();
server.accept(aliceServerSide);
server.accept(bobServerSide);
const aliceCollector = new FrameCollector(aliceClient);
const bobCollector = new FrameCollector(bobClient);

// Alice negotiates cost.budget + lease_expires_at so the runtime
// initializes/echoes those bounds.
await aliceClient.send({
arcp: PROTOCOL_VERSION,
id: "msg-hello",
type: "session.hello",
payload: {
client: { name: "test-client", version: "0.0.1" },
capabilities: {
encodings: ["json"],
features: ["subscribe", "cost.budget", "lease_expires_at"],
},
auth: { scheme: "bearer", token: "tok-alice" },
},
});
const aliceSessionId = (
await aliceCollector.waitFor((f) => f["type"] === "session.welcome")
).find((f) => f["type"] === "session.welcome")!["session_id"] as string;

const expiresAt = new Date(Date.now() + 15 * 60_000).toISOString();
await aliceClient.send({
arcp: PROTOCOL_VERSION,
id: "msg-submit-budget",
type: "job.submit",
session_id: aliceSessionId,
payload: {
agent: "slow-noop",
input: {},
lease_request: { "cost.budget": ["USD:5.00"] },
lease_constraints: { expires_at: expiresAt },
},
});
const jobId = (
(await aliceCollector.waitFor((f) => f["type"] === "job.accepted")).find(
(f) => f["type"] === "job.accepted",
)!["payload"] as Record<string, unknown>
)["job_id"] as string;

await bobClient.send(helloFrame("tok-bob"));
const bobSessionId = (
await bobCollector.waitFor((f) => f["type"] === "session.welcome")
).find((f) => f["type"] === "session.welcome")!["session_id"] as string;

await bobClient.send({
arcp: PROTOCOL_VERSION,
id: "msg-sub-budget",
type: "job.subscribe",
session_id: bobSessionId,
payload: { job_id: jobId },
});
const payload = (
await bobCollector.waitFor((f) => f["type"] === "job.subscribed")
).find((f) => f["type"] === "job.subscribed")!["payload"] as Record<
string,
unknown
>;

// Observer (bob) gets the non-secret authority bounds...
expect(payload["budget"]).toEqual({ USD: 5 });
expect(
(payload["lease_constraints"] as Record<string, unknown> | undefined)?.[
"expires_at"
],
).toBe(expiresAt);
// ...but never credentials.
expect(payload["credentials"]).toBeUndefined();

await aliceClient.close();
await bobClient.close();
await server.close();
});
});
Loading