A production-grade, recursive multi-agent LLM framework for Go. Build hierarchical AI agent systems with typed state management, concurrent execution, human-in-the-loop controls, and full observability — all with a single recursive config struct.
Built on top of langchaingo.
- Recursive Agent Hierarchy — Orchestrator delegates to sub-agents, which can have their own sub-agents. Same config struct at every level.
- Multi-Provider LLM Support — OpenAI, Anthropic (Claude), Google (Gemini), XAI (Grok), Groq, Ollama. Mix providers across agents.
- ReAct Execution Loop — Agents reason, act (call tools), observe results, and iterate until they reach an answer.
- Parallel Tool Execution — Multiple tool calls execute concurrently with goroutines and proper synchronization.
- Built-in VFS Tools — Every agent gets
ls,read_file,write_file,edit_file,grep,globout of the box. - Skills System — Load domain knowledge from Markdown files (with YAML frontmatter) and inject into agent prompts.
- Custom Tools — Implement the standard
langchaingo/tools.Toolinterface. Agents discover and call them automatically.
- Typed Agent State — Structured state container with concurrency-safe reducers (messages: append-only, files: merge-by-path, todos: last-writer-wins, skills metadata: merge-by-name, custom: deep merge).
- Thread Lifecycle — Full thread lifecycle management with status derivation (idle → busy → interrupted → error), busy locks, and self-healing for orphaned runs.
- Multitask Strategies —
reject,rollback,interrupt,enqueue— controls behavior when a new run is submitted while a thread is busy. - WAL Checkpointing — Write-ahead log checkpoints with parent chains, pending writes, interrupt state, and time-travel debugging via state history.
- Thread Fork — Clone a thread with all its messages and checkpoints to a new thread ID.
- Human-in-the-Loop (HITL) — Interrupt agent execution before or after specific tool calls for human approval. Per-tool or wildcard interrupt configs.
- Model Call Limit — Configurable per-run LLM call limit with graceful end or throw behavior.
- Memory Persistence — Load AGENTS.md memory files into system prompts with hot-reload support.
- Progressive Skills Disclosure — Only inject skill names and descriptions; agents read full instructions on-demand via
read_file. - Model Retry — Exponential backoff retry for failed LLM calls (configurable max retries, backoff multiplier).
- Model Fallback — Automatic fallback to cheaper models on primary model failure (lazy-initialized fallback chain).
- Tool Retry — Retry failed tool calls with configurable strategy.
- Tool Result Eviction — Evict large tool outputs to VFS backend, replace with head/tail preview to stay within context limits.
- Patch Tool Calls — Automatically fix dangling tool calls (AI messages with tool_calls but no corresponding ToolMessage).
- Token Usage Tracking — Per-run token usage accumulation across all LLM calls, thread-safe for concurrent subagents.
- Auth Middleware — Multi-tenant authentication and authorization with pluggable providers (API key, custom). Per-tool action whitelisting.
- Tracing — OpenTelemetry-compatible span instrumentation for agent invocations and tool calls. Pluggable backends (in-memory, noop, or custom).
- SSE Streaming — Server-Sent Events with per-run event streams, multiple concurrent subscribers, resumability via monotonic event IDs, and keepalive.
- Webhook Support — Async webhook delivery on run completion/failure with retry, exponential backoff, event type filtering, and runtime registration.
- Agent Protocol HTTP Server — REST API with SSE streaming, background runs, cancellation. LangGraph Studio compatible.
- Pluggable Storage — In-memory (default), MongoDB, Redis (ephemeral runs), or hybrid (MongoDB + Redis). Implement the
Storeinterface for your own backend. - Cross-Thread VFS — Persistent key-value backed virtual filesystem (
afero.Fsinterface) for cross-conversation file storage with namespace isolation. - General-Purpose Subagent — Convenience factory for creating general-purpose task delegation subagents.
- Concurrent Spawn Limits — Configurable semaphore-based limit on concurrent subagent spawns.
go get github.com/denizumutdereli/go-deepagentpackage main
import (
"context"
"fmt"
"os"
"github.com/denizumutdereli/go-deepagent/pkg/agent"
)
func main() {
app, err := agent.New(agent.AgentConfig{
Name: "assistant",
Model: "gpt-4.1",
Prompt: "You are a helpful assistant.",
}, os.Getenv("OPENAI_API_KEY"))
if err != nil {
panic(err)
}
result, err := app.Process(context.Background(), "What is the capital of France?")
if err != nil {
panic(err)
}
fmt.Println(result)
}┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────────────────┤
│ pkg/agent │ pkg/server │ pkg/store │
│ ───────────────── │ ───────────── │ ───────────────── │
│ App │ HTTP Server │ MemoryStore │
│ ReactExecutor │ SSE Streaming │ MongoStore │
│ AgentState + Reducers│ Background Runs │ RedisStore │
│ ThreadManager │ Cancellation │ HybridStore │
│ CheckpointManager │ │ │
│ StreamManager │ pkg/protocol │ │
│ WebhookManager │ ───────────── │ │
│ MiddlewareChain │ Thread, Run │ │
│ StoreBackend (VFS) │ Checkpoint │ │
│ Tracing │ Message │ │
│ Auth │ StoreItem │ │
└─────────────────────────────────────────────────────────────────┘
| Package | Description |
|---|---|
pkg/agent |
Core agent engine — ReAct loop, typed state, thread lifecycle, middleware, streaming, webhooks, auth, tracing |
pkg/server |
Agent Protocol HTTP server with SSE streaming, background runs, cancellation |
pkg/store |
Storage backends — in-memory, MongoDB, Redis (ephemeral), hybrid (MongoDB + Redis) |
pkg/protocol |
Shared types — Thread, Run, Checkpoint, Message, StoreItem |
The entire system is configured with a single recursive struct:
type AgentConfig struct {
// Identity
Name string // unique name for this agent
Description string // shown to parent agent for routing
// Prompt (required)
Prompt string
// Model — supports "provider:model" format
Provider string // "openai", "anthropic", "google", "xai", "groq", "ollama"
Model string // "gpt-4.1", "anthropic:claude-sonnet-4-20250514", "xai:grok-4-1-fast-reasoning"
APIKey string // falls back to env vars if empty
BaseURL string // custom API endpoint
Temperature float64 // 0.0 - 1.0
// Capabilities
Tools []tools.Tool // langchaingo tool interface
Skills []Skill // domain knowledge definitions
MaxIter int // max ReAct iterations (default: 25)
// Recursive — sub-agents use the SAME struct
SubAgents []AgentConfig
// Infrastructure
Middleware []Middleware
Backend Backend // VFS backend
Store Store // conversation storage
}Specify providers explicitly or use the "provider:model" shorthand:
// Auto-detected as OpenAI
Model: "gpt-4.1"
// Explicit provider
Model: "anthropic:claude-sonnet-4-20250514"
// XAI Grok
Model: "xai:grok-4-1-fast-reasoning"
// Google Gemini
Model: "google:gemini-2.5-flash"
// Ollama (local)
Model: "ollama:llama3",
BaseURL: "http://localhost:11434",API keys are resolved in this order:
AgentConfig.APIKeyfield- Fallback API key passed to
agent.New() - Environment variables:
OPENAI_API_KEY,ANTHROPIC_API_KEY,GOOGLE_API_KEY,XAI_API_KEY,GROQ_API_KEY
Build hierarchical agent systems where the orchestrator automatically routes tasks to specialized sub-agents:
app, err := agent.New(agent.AgentConfig{
Name: "orchestrator",
Model: "gpt-4.1",
Prompt: "Route tasks to the best sub-agent.",
SubAgents: []agent.AgentConfig{
{
Name: "researcher",
Description: "Searches the web for information",
Model: "xai:grok-4-1-fast-reasoning",
APIKey: xaiKey,
Tools: []tools.Tool{searchTool},
Prompt: "You are a web researcher...",
},
{
Name: "coder",
Description: "Writes and analyzes code",
Model: "anthropic:claude-sonnet-4-20250514",
APIKey: anthropicKey,
Prompt: "You are a software engineer...",
},
{
Name: "casual",
Description: "Handles casual conversation",
Model: "gpt-4.1-mini",
Prompt: "You are a friendly assistant...",
},
},
}, openaiKey)The orchestrator automatically gets a task tool that delegates to sub-agents based on their descriptions.
Get real-time visibility into agent execution:
eventCh := make(chan agent.ReactEvent, 100)
go func() {
for evt := range eventCh {
switch evt.Type {
case agent.EventIterationStart:
fmt.Printf("⟳ [%s] iteration %d\n", evt.Agent, evt.Iteration)
case agent.EventLLMResponse:
fmt.Printf("💭 [%s] %s\n", evt.Agent, evt.Content)
case agent.EventToolStart:
fmt.Printf("🔧 [%s] calling %s\n", evt.Agent, evt.ToolName)
case agent.EventToolEnd:
fmt.Printf("✓ [%s] %s done\n", evt.Agent, evt.ToolName)
case agent.EventFinalAnswer:
fmt.Printf("✅ [%s] %s\n", evt.Agent, evt.Content)
}
}
}()
result, err := app.SendWithEvents(ctx, threadID, "Analyze this data", eventCh)
close(eventCh)| Event | Description |
|---|---|
EventIterationStart |
New ReAct iteration beginning |
EventLLMResponse |
LLM thinking/reasoning output (before tool calls) |
EventToolStart |
Tool invocation starting |
EventToolEnd |
Tool invocation completed with result |
EventFinalAnswer |
Agent has reached its final answer |
Maintain conversation history across multiple interactions:
// Create a thread
threadID, err := app.CreateThread(ctx, agent.ThreadConfig{
UserID: "user-123",
})
// Send messages — history is managed automatically
result1, _ := app.Send(ctx, threadID, "What is Go?")
result2, _ := app.Send(ctx, threadID, "How does it handle concurrency?") // remembers context
// Read thread history
thread, _ := app.GetThread(ctx, threadID)
for _, msg := range thread.Messages {
fmt.Printf("[%s] %s\n", msg.Role, msg.Content)
}
// List checkpoints (snapshots after each interaction)
checkpoints, _ := app.ListCheckpoints(ctx, threadID, 10)
// List all threads
threads, _ := app.ListThreads(ctx, "user-123", 20, 0)Threads have four states: idle, busy, interrupted, error. Status is derived automatically from checkpoint and run state.
tm := agent.NewThreadManager(store)
// Submit with multitask strategy
run, err := tm.SubmitRun(ctx, threadID, "assistant", "Hello",
agent.MultitaskReject, // or: MultitaskRollback, MultitaskInterrupt, MultitaskEnqueue
nil,
)
// Lock/unlock for execution
runCtx, attempt, err := tm.LockRun(ctx, threadID, run.RunID)
defer tm.UnlockRun(threadID, run.RunID)
// Cancel a running run
tm.CancelRun(ctx, threadID, run.RunID, agent.CancelActionInterrupt)
// Fork a thread (copy messages + checkpoints)
newThreadID, err := tm.ForkThread(ctx, threadID)
// Derive thread status from checkpoint state
status, _ := tm.DeriveThreadStatus(ctx, threadID, nil)
// idle | busy | interrupted | errorWrite-ahead log checkpoints enable interrupt/resume and time-travel debugging:
cm := agent.NewCheckpointManager(store)
builder := cm.NewCheckpointBuilder(threadID, state)
// Track pending writes
builder.AddWrite("messages", newMessage, "task-1")
builder.AddWrite("files", fileUpdate, "task-1")
// Commit checkpoint with parent chain
cpID, err := builder.Commit(ctx, map[string]interface{}{"step": "tool_call"})
// Commit interrupt (for HITL)
cpID, err := builder.CommitInterrupt(ctx, []string{"write_file"}, interruptErr)
// Restore state from latest checkpoint
state, checkpoint, err := cm.RestoreState(ctx, threadID)
// Time-travel: browse state history
history, err := cm.GetStateHistory(ctx, threadID, 10)Per-run event streaming with pub/sub, resumable event IDs, and concurrent subscribers:
sm := agent.NewStreamManager()
sm.CreateStream(runID)
// Publish events (thread-safe, non-blocking for slow subscribers)
sm.Publish(runID, agent.StreamEventMessages, messageData)
sm.Publish(runID, agent.StreamEventValues, stateSnapshot)
// Subscribe with resumability (Last-Event-ID reconnection)
ctx, cancel := context.WithCancel(context.Background())
ch, err := sm.Subscribe(ctx, runID, lastEventID)
for evt := range ch {
sse, _ := evt.MarshalSSE()
w.Write([]byte(sse))
w.(http.Flusher).Flush()
}
// End stream + cleanup old streams
sm.EndStream(runID)
sm.Cleanup(2 * time.Hour)Async webhook delivery on run completion with retry and backoff:
wm := agent.NewWebhookManager(
agent.WebhookConfig{
URL: "https://api.example.com/hooks",
MaxRetries: 3,
Timeout: 10 * time.Second,
Events: []string{"run.completed", "run.failed"},
Headers: map[string]string{"X-API-Key": "secret"},
},
)
defer wm.Close()
// Add webhooks at runtime
wm.AddWebhook(agent.WebhookConfig{URL: "https://slack.example.com/webhook"})
// Emit events (non-blocking, async delivery via worker goroutine)
wm.Emit(agent.WebhookEvent{
Type: "run.completed",
RunID: runID,
ThreadID: threadID,
Status: "success",
})Multi-tenant auth with pluggable providers:
// API key provider
provider := agent.NewAPIKeyAuthProvider(map[string]*agent.AuthContext{
"key-123": {UserID: "user-1", OrgID: "org-1", Permissions: []string{"*"}},
"key-456": {UserID: "user-2", OrgID: "org-1", Permissions: []string{"execute_tool:read_file"}},
})
// Auth middleware
mw := agent.NewAuthMiddleware(agent.AuthMiddlewareConfig{
Provider: provider,
RequireAuth: true,
AllowedActions: []string{"read_file", "ls"}, // no auth needed for read-only tools
})
// Inject auth context
ctx := agent.WithAuthContext(ctx, &agent.AuthContext{
UserID: "user-1",
OrgID: "org-1",
})OpenTelemetry-compatible instrumentation:
// In-memory tracer (testing/debugging)
tracer := agent.NewInMemoryTracer()
mw := agent.NewTracingMiddleware(tracer)
// After execution
spans := tracer.Spans()
for _, s := range spans {
fmt.Printf("%s [%s] %s → %s\n", s.Name, s.Kind, s.StartTime, s.Status)
}
// Noop tracer (zero overhead in production)
mw := agent.NewTracingMiddleware(nil) // uses NoopTracer
// Custom tracer (implement the Tracer interface for OpenTelemetry, Datadog, etc.)
type Tracer interface {
StartSpan(ctx context.Context, name string, kind SpanKind, attrs map[string]interface{}) (context.Context, *Span)
EndSpan(span *Span, status SpanStatus, err error)
AddEvent(span *Span, name string, attrs map[string]interface{})
Flush(ctx context.Context) error
}Implement the standard langchaingo/tools.Tool interface:
type WebSearchTool struct {
apiKey string
}
func (t *WebSearchTool) Name() string { return "web_search" }
func (t *WebSearchTool) Description() string { return "Search the web. Input: JSON {\"query\": \"search terms\"}" }
func (t *WebSearchTool) Call(ctx context.Context, input string) (string, error) {
var args struct {
Query string `json:"query"`
}
json.Unmarshal([]byte(input), &args)
// ... perform search ...
return results, nil
}Load domain knowledge from Markdown files with YAML frontmatter:
---
name: security-audit
description: Smart contract security methodology
---
# Security Audit Process
1. Check for reentrancy vulnerabilities
2. Verify access control patterns
3. Review arithmetic operations for overflow
...// Load from file
skill, err := agent.SkillFromFile("skills/security-audit.md")
// Load from embedded filesystem
skill, err := agent.SkillFromEmbed(embedFS, "skills/security-audit.md")
// Create inline
skill := agent.NewSkill("math", "Math helper", "You can solve equations...")
// Use in agent config
cfg := agent.AgentConfig{
Skills: []agent.Skill{skill},
// ...
}The middleware pipeline supports BeforeInvoke, AfterInvoke, BeforeToolCall, AfterToolCall, ModelCallInterceptor (wraps LLM calls), and ToolCallInterceptor (wraps tool calls) hooks.
// Execution control
agent.NewModelCallLimitMiddleware(agent.ModelCallLimitConfig{
MaxCalls: 10,
ExitBehavior: "end", // "end" (graceful) or "throw" (error)
})
// Resilience
agent.NewModelRetryMiddleware(agent.ModelRetryConfig{
MaxRetries: 3,
InitialBackoff: time.Second,
BackoffMultiplier: 2.0,
OnFailure: "continue", // or "throw"
})
agent.NewModelFallbackMiddleware(agent.ModelFallbackConfig{
FallbackModels: []string{"openai:gpt-4.1-mini"},
})
agent.NewToolRetryMiddleware(agent.ToolRetryConfig{MaxRetries: 3})
// Human-in-the-Loop
agent.NewHITLMiddleware(agent.HITLConfig{
Interrupts: map[string]agent.InterruptConfig{
"write_file": {Before: true}, // require approval before writes
"*": {After: true, Reason: "audit"}, // audit all tool results
},
})
// Memory & context
agent.NewMemoryMiddleware(agent.MemoryMiddlewareConfig{
Sources: []string{"~/.deepagents/AGENTS.md", "./.deepagents/AGENTS.md"},
})
agent.NewToolResultEvictionMiddleware(agent.ToolEvictionConfig{
MaxResultTokens: 4000,
})
agent.NewPatchToolCallsMiddleware() // fix dangling tool calls
// Observability
agent.NewTokenUsageMiddleware(tracker, "gpt-4.1")
agent.NewTracingMiddleware(tracer) // OpenTelemetry-compatible
agent.NewAuthMiddleware(agent.AuthMiddlewareConfig{
Provider: apiKeyProvider,
RequireAuth: true,
})
// Existing
agent.LoggingMiddleware()
agent.AnthropicSanitizeMiddleware()
agent.TodoListMiddleware()
agent.SummarizationMiddleware(cfg)type MyMiddleware struct{}
func (m *MyMiddleware) Name() string { return "MyMiddleware" }
func (m *MyMiddleware) BeforeInvoke(ctx context.Context, ic *agent.InvokeContext) error {
fmt.Printf("Agent %s, iteration %d\n", ic.AgentName, ic.Iteration)
return nil
}
func (m *MyMiddleware) BeforeToolCall(ctx context.Context, tc *agent.ToolCallContext) error {
fmt.Printf("Tool: %s\n", tc.ToolName)
return nil
}Start a production-ready HTTP server compatible with LangGraph Studio:
import "github.com/denizumutdereli/go-deepagent/pkg/server"
srv, err := server.New(server.ServerConfig{
App: app,
Port: "8080",
Runner: server.RunnerConfig{
MaxConcurrent: 50,
RunTimeout: 5 * time.Minute,
ShutdownTimeout: 30 * time.Second,
},
})
srv.Start()GET /api/health Health check
POST /threads Create thread
POST /threads/search Search threads
GET /threads/{id} Get thread + messages
DELETE /threads/{id} Delete thread
GET /threads/{id}/history Checkpoint history
POST /threads/{id}/runs Background run
POST /threads/{id}/runs/stream Run + SSE stream
POST /threads/{id}/runs/wait Run + wait for result
POST /runs Stateless background run
POST /runs/stream Stateless run + SSE stream
POST /runs/wait Stateless run + wait
GET /runs/{id} Get run status
GET /runs/{id}/stream Reconnect to SSE stream
GET /runs/{id}/wait Wait for completion
POST /runs/{id}/cancel Cancel run
r := chi.NewRouter()
r.Use(myAuthMiddleware)
r.Get("/custom", myHandler)
srv, _ := server.New(server.ServerConfig{
App: app,
Router: r, // Agent Protocol routes are mounted on your router
})srv, _ := server.New(server.ServerConfig{
App: app,
SSEFormatter: func(evt agent.ReactEvent) (string, []byte) {
// Custom SSE event formatting
return string(evt.Type), json.Marshal(evt)
},
})// Automatically used when no Store is provided
app, _ := agent.New(cfg, apiKey)import "github.com/denizumutdereli/go-deepagent/pkg/store"
ms, err := store.ConnectMongo(ctx, "mongodb://localhost:27017", "mydb")
app, _ := agent.New(agent.AgentConfig{
Store: ms,
// ...
}, apiKey)rs, err := store.NewRedisStore(store.RedisConfig{
Addr: "localhost:6379",
RunTTL: 2 * time.Hour, // automatic cleanup
})Production pattern: persistent threads/checkpoints in MongoDB, ephemeral runs in Redis with TTL-based cleanup.
ms, _ := store.ConnectMongo(ctx, "mongodb://localhost:27017", "mydb")
rs, _ := store.NewRedisStore(store.DefaultRedisConfig())
hybrid := store.NewHybridStore(ms, rs)
app, _ := agent.New(agent.AgentConfig{
Store: hybrid,
// ...
}, apiKey)Implement the agent.Store interface:
type Store interface {
// Threads
CreateThread(ctx context.Context, t *protocol.Thread) error
GetThread(ctx context.Context, id string) (*protocol.Thread, error)
DeleteThread(ctx context.Context, id string) error
SetThreadStatus(ctx context.Context, id string, status protocol.ThreadStatus) error
AppendMessage(ctx context.Context, threadID string, msg protocol.Message) error
SearchThreads(ctx context.Context, userID string, limit, offset int) ([]*protocol.Thread, error)
// Checkpoints
SaveCheckpoint(ctx context.Context, cp *protocol.Checkpoint) error
GetLatestCheckpoint(ctx context.Context, threadID string) (*protocol.Checkpoint, error)
ListCheckpoints(ctx context.Context, threadID string, limit int, before string) ([]*protocol.Checkpoint, error)
// Runs
CreateRun(ctx context.Context, r *protocol.Run) error
GetRun(ctx context.Context, id string) (*protocol.Run, error)
UpdateRun(ctx context.Context, r *protocol.Run) error
ListRunsByThread(ctx context.Context, threadID string, limit, offset int) ([]*protocol.Run, error)
}An interactive CLI with X/Twitter research (via XAI Grok), web search (via Tavily), and casual chat — plus an --serve mode for HTTP API.
cd examples/researcher
cp .env.example .env # fill in your keys
go run .Features:
- 3 sub-agents: researcher (XAI), websearch (Tavily), casual (GPT)
- Thread management:
new,thread,threads,checkpoints - Server mode:
go run . --serve 8080 - MongoDB support:
go run . --store mongodb://localhost:27017/researcher
A blockchain security auditor with real on-chain tools — Etherscan API, Alchemy RPC, ABI decoding — across multiple chains.
cd examples/onchain-auditor
cp .env.example .env # fill in your keys
go run .Features:
- 4 sub-agents: security-auditor (Gemini), token-analyst (GPT), tx-investigator (GPT), chain-scanner (XAI)
- Multi-chain support: Ethereum, Polygon, BSC, Arbitrum, Optimism, Base, Avalanche
- Real on-chain tools: contract source, token transfers, balances, event logs, ABI decoding
- Security audit skills: SWC attack vectors, DeFi patterns, audit methodology
Every agent automatically receives these filesystem tools:
| Tool | Description |
|---|---|
ls |
List directory contents |
read_file |
Read file contents |
write_file |
Write content to a file |
edit_file |
Find-and-replace edit |
grep |
Search file contents |
glob |
Find files by pattern |
The VFS backend is pluggable — default is in-memory (afero.MemMapFs), but you can use OS filesystem or any afero.Fs implementation.
# Run all tests
go test ./...
# Run with verbose output
go test -v ./pkg/agent/...
# Run specific test suites
go test -v ./pkg/agent/ -run TestStreamManager
go test -v ./pkg/agent/ -run TestWebhookManager
go test -v ./pkg/agent/ -run TestAuthMiddleware
go test -v ./pkg/agent/ -run TestTracingMiddleware
# E2E tests (require API keys in .env.test)
cp .env.example .env.test
go test -v ./pkg/agent/ -run TestE2ETest coverage includes:
- State reducers — append-only messages, merge-by-path files, skills metadata merge, subagent clone/merge
- Thread lifecycle — lock/unlock, multitask strategies, retry counters, status derivation, fork
- Middleware — HITL interrupts, model call limits, memory loading, model retry/fallback, tool eviction, patch tool calls, auth, tracing
- Streaming — publish/subscribe, resumability, concurrent access, context cancellation, cleanup
- Token usage — concurrent accumulation, read/write safety
- Webhooks — delivery, retry, event filtering, runtime registration
- Store backends — StoreBackend (VFS), MemoryStore operations
# Start test dependencies (Redis + MongoDB)
docker-compose -f docker-compose.test.yml up -d
# Run integration tests
go test ./... -count=1
# Stop
docker-compose -f docker-compose.test.yml down# Start MongoDB only (for persistent storage in dev)
docker-compose up -d
# Stop
docker-compose downgo-deepagent is designed for high-concurrency production use:
sync.RWMutexthroughout — read-heavy data structures use RLock for concurrent reads (AgentState, TokenUsage, MemoryMiddleware, AuthProvider, StreamManager)- Lock ordering — documented and enforced to prevent deadlocks (snapshot-then-apply pattern in state merges, 2-phase cleanup in stream manager)
sync/atomic— lock-free counters for event IDs- Buffered channels — non-blocking event delivery in StreamManager and WebhookManager
- Worker goroutines — webhook delivery uses a single worker goroutine to prevent HTTP connection storms
- Semaphore pattern — buffered channel limits concurrent subagent spawns
- Context propagation — all long-running operations respect
context.Contextfor cancellation
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.