Skip to content

Commit 8fa4489

Browse files
committed
sdk: component-as-tool — AgentTool opt-in + RPC dispatch
Adds the wire + scheduler plumbing for the platform's MCP layer to expose every opted-in component as an LLM tool, dispatched directly to the module pod with no flow assembly: - module.AgentTool capability interface. Components implement it to surface as an MCP tool — pure transforms, one-shot API calls. Stateful pieces (servers, tickers, kv) leave it unimplemented. - Runner Msg gains a Mode field. The transport receivers populate it from the x-mode header on both NATS core and JetStream paths. - Scheduler.Handle short-circuits to handleRPC when Mode == "rpc". Looks up the component, calls Instance() for a fresh struct, JSON- unmarshals the payload onto the input port's Configuration type, runs Handle with a capture handler that records the first emit on the configured output port, and returns it as the reply. - wire.PublishRPC is the sender side. Mirrors Publish but addresses module + component (not node + port), always synchronous on a per-call inbox, OTel ctx threaded through. Round 1 of three. Round 2 wires the platform MCPService to discover opted-in components per workspace and list them as tools.
1 parent d93c22a commit 8fa4489

7 files changed

Lines changed: 297 additions & 0 deletions

File tree

internal/scheduler/rpc.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"reflect"
8+
"sync"
9+
10+
"github.com/tiny-systems/module/internal/scheduler/runner"
11+
"github.com/tiny-systems/module/module"
12+
)
13+
14+
// handleRPC dispatches an MCP tool call directly to a component
15+
// without touching the flow graph. msg.To carries the bare component
16+
// name (no flow prefix, no port suffix); msg.Data is the JSON payload
17+
// the agent supplied. The dispatcher:
18+
//
19+
// 1. Looks up the component in the local registry,
20+
// 2. Confirms it implements AgentTool,
21+
// 3. Creates a fresh Instance() — no settings, no metadata, no
22+
// persisted state. The agent's payload is everything,
23+
// 4. Unmarshals msg.Data into the input port's struct type,
24+
// 5. Calls Handle() with a capture handler that records the first
25+
// emit on the configured output port,
26+
// 6. Returns the captured payload to the JetStream reply path.
27+
//
28+
// Stateful components don't opt into AgentTool and so never reach
29+
// this path. Anything that does opt in must work from a cold instance
30+
// with only the call payload.
31+
func (s *Schedule) handleRPC(ctx context.Context, msg *runner.Msg) (any, error) {
32+
componentName := msg.To
33+
if componentName == "" {
34+
return nil, fmt.Errorf("rpc: msg.To is empty (need component name)")
35+
}
36+
37+
cmp, ok := s.componentsMap.Get(componentName)
38+
if !ok {
39+
return nil, fmt.Errorf("rpc: component %q not registered on this module", componentName)
40+
}
41+
42+
agentTool, ok := cmp.(module.AgentTool)
43+
if !ok {
44+
return nil, fmt.Errorf("rpc: component %q does not opt into AgentTool", componentName)
45+
}
46+
info := agentTool.AgentTool()
47+
inputPort := info.InputPort
48+
if inputPort == "" {
49+
inputPort = "in"
50+
}
51+
outputPort := info.OutputPort
52+
if outputPort == "" {
53+
outputPort = "out"
54+
}
55+
56+
instance := cmp.Instance()
57+
if instance == nil {
58+
return nil, fmt.Errorf("rpc: %s.Instance() returned nil", componentName)
59+
}
60+
61+
inputType, err := portConfigurationType(instance, inputPort)
62+
if err != nil {
63+
return nil, fmt.Errorf("rpc: %s: %w", componentName, err)
64+
}
65+
input := reflect.New(inputType).Elem()
66+
if len(msg.Data) > 0 {
67+
if err := json.Unmarshal(msg.Data, input.Addr().Interface()); err != nil {
68+
return nil, fmt.Errorf("rpc: unmarshal %s input: %w", componentName, err)
69+
}
70+
}
71+
72+
capture := &rpcCapture{port: outputPort}
73+
result := instance.Handle(ctx, capture.handler, inputPort, input.Interface())
74+
if err := result.Err(); err != nil {
75+
return nil, err
76+
}
77+
78+
// Prefer the emit on the configured output port; fall back to
79+
// the Handle Result's value when the component returned its
80+
// response directly (some authors do this for one-shot tools).
81+
if capture.captured.Load() {
82+
return capture.payload, nil
83+
}
84+
return result.Value(), nil
85+
}
86+
87+
// portConfigurationType returns the reflect.Type of the named port's
88+
// Configuration struct on the component instance, so the dispatcher
89+
// can build a typed value to JSON-unmarshal the agent's payload into.
90+
func portConfigurationType(instance module.Component, portName string) (reflect.Type, error) {
91+
for _, p := range instance.Ports() {
92+
if p.Name != portName {
93+
continue
94+
}
95+
if p.Configuration == nil {
96+
return nil, fmt.Errorf("port %q has no Configuration shape", portName)
97+
}
98+
return reflect.TypeOf(p.Configuration), nil
99+
}
100+
return nil, fmt.Errorf("port %q not found on component", portName)
101+
}
102+
103+
// rpcCapture wraps a module.Handler that records the first emit on
104+
// the configured output port. Subsequent emits return Ok(nil) and are
105+
// otherwise discarded — RPC mode is single-shot by design.
106+
type rpcCapture struct {
107+
port string
108+
109+
mu sync.Mutex
110+
captured atomicBool
111+
payload any
112+
}
113+
114+
func (c *rpcCapture) handler(ctx context.Context, port string, data any) module.Result {
115+
if port != c.port {
116+
return module.Ok(nil)
117+
}
118+
c.mu.Lock()
119+
if !c.captured.Load() {
120+
c.payload = data
121+
c.captured.Store(true)
122+
}
123+
c.mu.Unlock()
124+
return module.Ok(nil)
125+
}
126+
127+
// atomicBool keeps the tiny capture-state machine self-contained and
128+
// import-free at the file level.
129+
type atomicBool struct{ v bool }
130+
131+
func (a *atomicBool) Load() bool { return a.v }
132+
func (a *atomicBool) Store(b bool) { a.v = b }

internal/scheduler/runner/msg.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ type Msg struct {
1515
// Used to detect cycles and prevent stack overflow in blocking I/O chains.
1616
Depth int `json:"depth"`
1717

18+
// Mode picks the dispatch path. "" (default) routes to a node
19+
// instance via the edge graph the way every business hop has
20+
// since the SDK was built. "rpc" short-circuits the graph: the
21+
// scheduler looks up the component by name, instantiates it
22+
// fresh, delivers Data on the component's AgentTool InputPort,
23+
// captures the first emit on OutputPort, and returns that as
24+
// the reply. RPC mode is how MCP tool calls reach a component
25+
// without flow assembly.
26+
Mode string `json:"mode,omitempty"`
27+
1828
//
1929
Resp interface{} `json:"-"`
2030
}

internal/scheduler/scheduler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ func (s *Schedule) Handle(ctx context.Context, msg *runner.Msg) (any, error) {
132132
return nil, &perrors.PermanentError{Err: fmt.Errorf("max message depth %d exceeded (possible cycle in flow graph)", MaxMessageDepth)}
133133
}
134134

135+
// RPC mode: agent invoked a component as an MCP tool. Short-
136+
// circuits the node-graph dispatch — see handleRPC for details.
137+
if msg.Mode == "rpc" {
138+
return s.handleRPC(ctx, msg)
139+
}
140+
135141
nodeName, port := utils.ParseFullPortName(msg.To)
136142

137143
s.log.Info("scheduler handle: received message",

internal/transport/jetstream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ func (t *JetStream) handleIncoming(parentCtx context.Context, handler runner.Han
351351
From: headers.Get(headerFrom),
352352
Data: m.Data(),
353353
Depth: depth,
354+
Mode: headers.Get(headerMode),
354355
})
355356

356357
close(stopIP)

internal/transport/nats.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const (
5151
headerErrorCode = "x-error-code"
5252
headerEmpty = "x-empty"
5353
headerReplyInbox = "x-reply-inbox"
54+
headerMode = "x-mode"
5455
natsMsgIDHeader = "Nats-Msg-Id"
5556
)
5657

@@ -266,6 +267,7 @@ func (t *NATS) handleIncoming(parentCtx context.Context, handler runner.Handler,
266267
From: m.Header.Get(headerFrom),
267268
Data: m.Data,
268269
Depth: depth,
270+
Mode: m.Header.Get(headerMode),
269271
})
270272
if err != nil {
271273
// Reply with an error header. Caller's Handler() reads

module/agent_tool.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package module
2+
3+
// AgentToolInfo describes how a component is exposed as an MCP tool
4+
// to LLM agents. Empty fields fall back to sensible defaults derived
5+
// from the component itself, so most opted-in components only need
6+
// to provide a short Description override (or nothing at all).
7+
type AgentToolInfo struct {
8+
// Name overrides the tool name. Empty = "<module>_<component>"
9+
// constructed by the platform-side MCP registry; component
10+
// authors rarely override this.
11+
Name string
12+
13+
// Description for the agent-facing tool. Empty = component's
14+
// ComponentInfo.Info text. Aim for one sentence the LLM can
15+
// match against natural-language requests.
16+
Description string
17+
18+
// InputPort names the port the agent's payload arrives on for
19+
// RPC dispatch. Empty = "in". The Configuration field on that
20+
// port's struct is used as the JSON Schema the agent's tool
21+
// call validates against.
22+
InputPort string
23+
24+
// OutputPort is the port the RPC dispatcher waits on for a
25+
// reply emit before returning to the caller. Empty = "out".
26+
// Components that fan out to multiple ports must pick one
27+
// authoritative reply port here.
28+
OutputPort string
29+
}
30+
31+
// AgentTool is the opt-in capability components implement to be
32+
// exposed as MCP tools. Without it, a component stays a flow-only
33+
// building block (which is the right answer for stateful pieces:
34+
// http_server, ticker, cron, kv).
35+
//
36+
// The dispatcher creates a fresh component Instance() per call, hands
37+
// it the payload on InputPort, captures the first emit on OutputPort,
38+
// and replies. No flow context, no edges, no settings persistence —
39+
// the agent provides everything the component needs in the call
40+
// payload.
41+
//
42+
// Only stateless components should opt in. Pure transforms
43+
// (json_encode, transform), one-shot calls (http_request, llm_complete,
44+
// slack_send_message, database_query) are good candidates. Anything
45+
// that needs settings, persistent state, or a lifecycle (servers,
46+
// timers, in-memory caches) should leave this interface unimplemented.
47+
type AgentTool interface {
48+
AgentTool() AgentToolInfo
49+
}

pkg/wire/rpc.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// RPC dispatch helper — agent MCP tool calls go through this. Unlike
2+
// Publish (which targets a node:port inside a flow), PublishRPC fires
3+
// at a bare component name on a module and waits for the reply on a
4+
// per-call inbox. The SDK receiver short-circuits the node dispatch
5+
// when it sees x-mode: rpc and runs the component fresh.
6+
7+
package wire
8+
9+
import (
10+
"context"
11+
"errors"
12+
"fmt"
13+
"time"
14+
15+
"github.com/nats-io/nats.go"
16+
perrors "github.com/tiny-systems/module/pkg/errors"
17+
"go.opentelemetry.io/otel"
18+
"go.opentelemetry.io/otel/propagation"
19+
)
20+
21+
// headerMode mirrors internal/transport's constant. Duplicated here
22+
// because external callers can't reach internal/. Any change to the
23+
// header name needs to update both.
24+
const headerMode = "x-mode"
25+
26+
// RPCOptions controls a single PublishRPC call. Mirrors Options but
27+
// drops fields that don't apply (EdgeID, From, WaitForReply — RPC is
28+
// always a request/reply round-trip).
29+
type RPCOptions struct {
30+
// Timeout caps how long to wait for the receiver's reply. Zero
31+
// defaults to 30s. Long-running tools (LLM completions, image
32+
// generation) should set this explicitly.
33+
Timeout time.Duration
34+
}
35+
36+
// PublishRPC invokes <componentName> on <moduleName> as an MCP-style
37+
// tool call. data is the JSON payload the component's input port
38+
// expects; the returned bytes are the JSON output the component
39+
// emitted on its configured AgentTool output port.
40+
//
41+
// Returns an error if the receiver couldn't find / instantiate the
42+
// component, the component returned Fail, or the reply didn't arrive
43+
// before Timeout.
44+
func PublishRPC(ctx context.Context, nc *nats.Conn, moduleName, componentName string, data []byte, opts RPCOptions) ([]byte, error) {
45+
if nc == nil {
46+
return nil, fmt.Errorf("nats conn is nil")
47+
}
48+
if moduleName == "" {
49+
return nil, fmt.Errorf("moduleName is required")
50+
}
51+
if componentName == "" {
52+
return nil, fmt.Errorf("componentName is required")
53+
}
54+
55+
inbox := nats.NewInbox()
56+
sub, err := nc.SubscribeSync(inbox)
57+
if err != nil {
58+
return nil, fmt.Errorf("subscribe reply inbox: %w", err)
59+
}
60+
defer func() { _ = sub.Unsubscribe() }()
61+
62+
msg := &nats.Msg{
63+
Subject: SubjectFor(moduleName),
64+
Data: data,
65+
Header: nats.Header{},
66+
}
67+
msg.Header.Set(HeaderTo, componentName)
68+
msg.Header.Set(headerMode, "rpc")
69+
msg.Header.Set(HeaderReplyInbox, inbox)
70+
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header))
71+
72+
if err := nc.PublishMsg(msg); err != nil {
73+
return nil, fmt.Errorf("publish rpc: %w", err)
74+
}
75+
76+
timeout := opts.Timeout
77+
if timeout == 0 {
78+
timeout = 30 * time.Second
79+
}
80+
waitCtx, cancel := context.WithTimeout(ctx, timeout)
81+
defer cancel()
82+
83+
reply, err := sub.NextMsgWithContext(waitCtx)
84+
if err != nil {
85+
return nil, err
86+
}
87+
if e := reply.Header.Get(HeaderError); e != "" {
88+
if code := reply.Header.Get(HeaderErrorCode); code != "" {
89+
return nil, perrors.NonRetryable(code, errors.New(e))
90+
}
91+
return nil, errors.New(e)
92+
}
93+
if reply.Header.Get(HeaderEmpty) == "1" || len(reply.Data) == 0 {
94+
return nil, nil
95+
}
96+
return reply.Data, nil
97+
}

0 commit comments

Comments
 (0)