"Most enterprise AI agents are failing in production because they rely on stale context — we're feeding 2026-speed models with 1996-speed batch pipelines."
autonomous-context-fabric (formerly stream-graph-rag) is a 90-day, build-in-public project that solves the "Missing Middle" of the enterprise AI stack: the gap between high-velocity business events and the context your agents actually reason over.
Built with Pathway (stateful stream processing) + Omnigraph (S3-native immutable graph) to deliver sub-60-second account intelligence — directly from SEC filings, CRM webhooks, and support events.
Batch-based RAG creates a "Context Debt" — a growing gap between what your agent believes and what is actually true — that is the primary cause of production AI failures in relationship-intensive enterprise workflows.
The QBR Scenario: A Sales Director walks into a Quarterly Business Review with "Global Corp." Their RAG agent says the account is "Stable." In reality, 20 minutes ago:
- An SEC filing hit the wire showing a hostile takeover bid
- A support ticket was just escalated to "Critical" for their main subsidiary
Traditional RAG misses this. Autonomous Knowledge Fabric flags it in under 60 seconds.
During our migration sprint from Memgraph to an S3-native architecture, we uncovered several critical insights:
- The Agent-Native Paradigm: Omnigraph is designed for AI agents. By arming the Gemini CLI with
omnigraph-best-practicesandomnigraph-intel-bootstrapskills, the agent successfully synthesized the schema, queries, and custom HTTP clients from scratch without existing SDK examples. - Latency & Real-Time Viability: S3-native architecture changes the performance profile.
- Reads (~80ms): Highly performant and easily supports our sub-60s agent context assembly SLA.
- Writes (~1.2s): Single-row upserts incur a file-creation penalty due to S3 commits. While usable for background processing, high-throughput systems must batch ingestions.
- Developer Experience: Local S3 simulators (like RustFS) require explicit dummy credentials to bypass AWS metadata service (IMDS) timeouts.
┌─────────────────────────────────────────────────────────────┐
│ Event Sources │
│ SEC EDGAR RSS Synthetic CRM Synthetic Zendesk │
└──────────┬──────────────┬─────────────────┬────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Pathway Stream Processor (Single Container) │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ Normalization│ │ 3-Tier │ │ Omnigraph Sink │ │
│ │ & Extraction │─▶│ Resolver │─▶│ (S3/Boto3) │ │
│ └──────────────┘ └──────────────┘ └─────────────────┘ │
└─────────────────────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Omnigraph (S3-Native / MVCC) │
│ │
│ [Headless Branch: fragment/f1b20889] <-- Unverified │
│ │ Entity Buffer │
│ └──(Threshold Met)──▶ Fast-Forward Merge │
│ │ │
│ [Main Branch] ────────────────┴───────▶ Pinned Snapshot │
└─────────────────────────────────────────────────┬───────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Account Intelligence Agent + Streamlit Dashboard │
│ │
│ Account: Global Corp Risk Score: 🔴 CRITICAL │
│ Context Freshness: 14 seconds ago ████████░░ 82/100 │
└─────────────────────────────────────────────────────────────┘
| Tier | Method | Catches | LLM Cost |
|---|---|---|---|
| Tier 1 | Deterministic hashing (normalize, trim, regex) | ~60% of duplicates | $0 |
| Tier 2 | Graph-contextual neighbor matching in Omnigraph | ~30% of remaining | $0 |
| Tier 3 | LLM-as-Judge via gpt-4o-mini + Instructor |
Final ~10% ambiguous cases | Minimal |
To run the full Autonomous Knowledge Fabric, follow these steps in sequence to coordinate the storage, database, and pipeline layers.
Ensure you are using the .venv-omnigraph virtual environment and create an externalized .env.omni file to point to the local S3 simulator.
# Activate the virtual environment
source .venv-omnigraph/bin/activate
# Install dependencies
pip install -r requirements.txt
# Create the credentials file
cat <<EOF > .env.omni
AWS_ACCESS_KEY_ID=rustfsadmin
AWS_SECRET_ACCESS_KEY=rustfsadmin
AWS_REGION=us-east-1
AWS_ENDPOINT_URL=http://127.0.0.1:9000
AWS_ENDPOINT_URL_S3=http://127.0.0.1:9000
AWS_ALLOW_HTTP=true
AWS_S3_FORCE_PATH_STYLE=true
EOFThe fabric uses a local RustFS S3 simulator and the Omnigraph binary.
# Start RustFS via Docker
docker compose up -d rustfs
# Install Omnigraph binaries (./.omnigraph-rustfs-demo/bin)
curl -fsSL https://raw.githubusercontent.com/ModernRelay/omnigraph/main/scripts/local-rustfs-bootstrap.sh | bash
# Initialize the S3-native repository
export $(cat .env.omni | xargs)
./.omnigraph-rustfs-demo/bin/omnigraph init --schema schema.pg s3://omnigraph-local/crm-fixedThe server provides the GQL/HTTP API that the pipelines use for ingestion.
export $(cat .env.omni | xargs)
./.omnigraph-rustfs-demo/bin/omnigraph-server --bind 127.0.0.1:8080 s3://omnigraph-local/crm-fixedChoose between real SEC data or high-volume synthetic CRM events. Open a new terminal tab and ensure PYTHONPATH is set.
- Option A: Real SEC EDGAR Stream (Polls live filings):
export PYTHONPATH=. python pipelines/sec_ingestion.py - Option B: Synthetic CRM Load Generator (Faker-based):
export PYTHONPATH=. python pipelines/synthetic_crm.py
View accounts, risk scores, and event trails in a real-time UI.
export PYTHONPATH=.
streamlit run dashboard/app.pyGoal: Prove the live view works. An observer sees a news item hit the wire and the graph updates its risk score in <60 seconds.
| Week | Sprint | Deliverable |
|---|---|---|
| W1 | 1-4 | Repo setup. Omnigraph + RustFS initialized. AccountEvent schema for SEC filings. |
| W2 | 5-8 | S3-native sink via Boto3. upsert_account and upsert_event logic. Context API. |
| W3 | 9-12 | OpenTelemetry instrumentation. Tier 1/2 Resolvers (Deterministic & Neighbor matching). |
| W4 | 13-16 | Tier 3 Resolver (Gemini 1.5). Snapshot-pinned reads. Streamlit Dashboard v1. |
Month 1 Deliverable: A live dashboard showing real SEC filings, entities extracted and risk-scored within 60 seconds using S3-native snapshotting.
Theme: Resolution & Maturity Goal: Solve the "Acme Corp" duplication problem using MVCC branching.
- Week 5: Tier 1: Deterministic normalization (strip legal suffixes, trim).
- Week 6: Tier 2: Graph-contextual matching (shared domains/CIKs/executives).
- Week 7: Tier 3: LLM-as-Judge using Instructor-structured prompts.
- Week 8: Operational Hardening: Docker Compose v1, Salesforce/Zendesk synthetic schemas.
Theme: ROI & Comparison Goal: Prove value-per-query and operational simplicity.
- Week 9: RAG Baseline Comparison (Nightly batch refresh vs. Real-time).
- Week 10: The QBR Hero Demo: Scenario script where RAG fails and Fabric succeeds.
- Week 11: The Whitepaper: Infrastructure cost model (S3-native vs. In-memory vs. Nightly RAG).
- Week 12: Publication: README finalized, Whitepaper published, Demo video released.
| Metric | Omnigraph (Rust/S3) | Memgraph (Legacy/In-Memory) |
|---|---|---|
| P50 Upsert Latency | ~1,200 ms | ~2.00 ms |
| P99 Upsert Latency | ~1,800 ms | ~5.00 ms |
| P50 Context Read | ~80 ms | ~1.50 ms |
Note: Omnigraph upserts include 3 atomic mutations (Account, Event, Link) per ingestion over S3.
This repository is architected for AI agents. Any agent operating within this workspace must utilize the following specialized skills:
omnigraph-best-practices: Required for allschema.pgevolutions and.gqquery authoring.omnigraph-intel-bootstrap: Required for repository initialization and environment orchestration.
| Layer | Technology | Why |
|---|---|---|
| Stream processor | Pathway | Rust core, Python-native, incremental deltas |
| Knowledge graph | Omnigraph | S3-native, MVCC branching, Snapshot consistency |
| Schema validation | Pydantic | Type-safe event models, Pathway-native |
| LLM Judge | Gemini 1.5 Flash + Instructor | Structured outputs, minimal cost |
| Observability | OpenTelemetry | Vendor-neutral, from Day 1 |
| Dashboard | Streamlit | Fast iteration, no frontend overhead |
The Autonomous Knowledge Fabric operates as a push-based pipeline. Data does not simply "sit" in Pathway; it is actively pushed through a multi-stage resolution and ingestion stack.
- File:
pipelines/sec_ingestion.py(_row_to_account_event) - Action: Raw RSS/HTML entries are unescaped and parsed via Regex to extract Company Names, CIK numbers, and event types.
- Result: A validated
AccountEventPydantic model.
- File:
pipelines/sec_ingestion.py(pw.io.subscribe) - Action: Pathway's engine triggers the
_on_changecallback for every new event. This acts as the real-time bridge between the stream processor and the graph database.
- File:
pipelines/routing.py(OmnigraphRoutingManager) - Action:
- Tier 2 (Graph Context): Checks for "Strong Signals" (CIK, Domain, ID).
- Branch-Based Buffering: If an entity is "Weak" (name only), it is isolated in a headless side-branch (Fragment) in Omnigraph. This prevents "Context Pollution" on the
mainbranch. - Promotion: Once a threshold is met (via corroborated signals or Tier 3 LLM Judge), the branch is merged into
main.
To maintain 24/7 ingestion reliability, we implemented a Fast-Path logic for high-confidence data:
- Problem: High-concurrency writes to side-branches occasionally triggered
409 Conflict(Version Drift) errors during thebranch -> mergeworkflow, especially when multiple events hit the same entity simultaneously. - Solution: Events with Strong Signals (CIK, Domain, or explicit Account ID) bypass the branching/buffering layer entirely and are ingested directly into the
mainbranch. - Reliability: The
OmnigraphClientnow includes automatic retries with?sync_branch=true, forcing the server to synchronize the branch state before retrying a failed mutation.
- File:
engine/omnigraph/ingestion_sink.py(OmnigraphSink) - Action:
- Risk Scoring: Calculates the final health score for the account.
- Multi-Mutation: Executes three atomic GQL mutations (
insert_account,insert_event,link_account_event) to ensure structural integrity.
autonomous-knowledge-fabric/
├── CLAUDE.md # AI co-pilot context
├── docker-compose.yml # Full stack: one command
├── schema.pg # Omnigraph Schema-as-Code
├── models/
│ └── account_event.py # Core Pydantic schemas
├── engine/
│ └── omnigraph/ # S3-native DB interaction
│ ├── client.py # Snapshotted HTTP client
│ └── ingestion_sink.py # Pathway → Omnigraph sink
├── pipelines/
│ ├── sec_ingestion.py # SEC EDGAR RSS → Pathway
│ ├── synthetic_crm.py # Faker-based CRM generator
│ └── resolver/ # 3-Tier Entity Resolution
├── scoring/
│ └── account_health.py # Risk score model
├── dashboard/
│ └── app.py # Streamlit dashboard
├── baseline_rag/ # Pinecone + LlamaIndex baseline
├── observability/
│ └── telemetry.py # OpenTelemetry setup
├── tests/ # Pytest suite
└── docs/ # Whitepaper and Weekly logs
This is a build-in-public project. Feedback, issues, and PRs welcome.
MIT — use it, fork it, build on it.
Built by Sreeram Nudurupati — a practitioner who got tired of explaining to stakeholders why the AI was confidently wrong.