From 1c080ee357e7124cdb39abe9bde6a422a8c977e2 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Thu, 7 May 2026 18:14:45 +0300 Subject: [PATCH 1/6] Data Layer proposal. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 158 +++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 docs/proposals/data-layer-proposal/README.md diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md new file mode 100644 index 0000000..e7f82ca --- /dev/null +++ b/docs/proposals/data-layer-proposal/README.md @@ -0,0 +1,158 @@ +# Proposal: Data Layer + +## Summary + +Introduce an **Async Data Layer** — a background observation pipeline that runs +**outside the critical request path**. It collects runtime events fired by the producer +(currently `server.go`), buffers them off the hot path, and dispatches them to registered +`Extractor`s that compute aggregates and write them to the DataStore for the Model Selector. + +## Goal + +Track runtime information about inference requests to help make better routing decisions — +for example, which model has the least in-flight requests or the lowest average latency. +This information is read by the Model Selector (Filter / Score / Pick) when choosing +where to route each request. + +## Requirements + +- **Non-blocking on the critical path** - data collection must add zero latency to request handling. +- **Multiple independent extractors** - different metrics must be computable independently without coupling to each other. +- **Extensible** - adding a new metric or event type must not require changes to existing extractors or the producer. +- **Off the plugin pipeline** - the data layer is a background concern; it must not participate in the per-request plugin chain. + + +## Proposal + +### Architecture + +```mermaid +flowchart TD + P["Producer\n(server.go)"] + PP["Plugin Pipeline\nFilter → Score → Pick"] + NS["NotificationSource\nchan Event (buffered)"] + TL["tick loop\nevery 100ms"] + ExtA["ConcurrencyExtractor"] + ExtB["LatencyExtractor\n(future)"] + DS1[("DataStore\nrunning-requests")] + DS2[("DataStore\npool-latency")] + MS["Model Selector"] + + P -->|"Notify(event) ~ns"| NS + P --> PP + NS --> TL + TL -->|"[]Event batch"| ExtA + TL -->|"[]Event batch"| ExtB + ExtA --> DS1 + ExtB --> DS2 + DS1 --> MS + DS2 --> MS + PP -->|reads| MS +``` + +The **producer** (currently `server.go`) fires an `Event` on each request and response — +a non-blocking channel write (~ns). The `NotificationSource` buffers it. A background +tick loop drains the buffer every 100ms and fans the batch to all registered `Extractor`s. +Each extractor switches on `Event.Type` and handles what it understands, ignoring the rest. + + +### Types (`pkg/framework`) + +```go +type DataSource interface { + Plugin // TypedName() TypedName + Start(ctx context.Context) error + Stop() +} + +// Event is the uniform carrier of all data layer events. +// Type identifies what happened; Payload holds the event-specific data. +type Event struct { + Type EventType + Payload any +} + +// NotificationSource buffers events off the hot path and dispatches +// batches to registered Extractors on each tick. +type NotificationSource interface { + DataSource + Notify(e Event) // non-blocking, called by the producer + RegisterExtractor(e Extractor) +} + +// Extractor switches on Event.Type and handles only the types it understands. +type Extractor interface { + DataSource + Extract(ctx context.Context, events []Event) error +} +``` + +See [Appendix](#appendix) for payload struct definitions and a full extractor example. + +### Registration (`runner.go`) + +```go +src := datalayer.NewNotificationSource("notification-source") +src.RegisterExtractor(datalayer.NewConcurrencyExtractor(handle)) +if err := src.Start(ctx); err != nil { ... } +// pass src to the producer so it can call src.Notify(...) +``` + +**Next:** move extractors registration to a config struct or CLI flags so operators can enable/disable metrics without recompiling. + + +## Future + +- **LatencyExtractor** - handles `ResponseEventType`; per-model avg latency; owns `"pool-latency"` topic +- **PollingDataSource** - polls inference pool `/metrics` on a ticker; same `Extractor` interface + +## Implementation Steps + +1. Add `DataSource`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework` +2. Implement `NotificationSource` (buffered channel + tick loop) in `pkg/plugins/datalayer/` +3. Implement `ConcurrencyExtractor` +4. Add `src.Notify(...)` calls to the producer alongside existing pipeline dispatch +5. Wire in `runner.go` + +--- + +## Appendix + +### Payload types + +```go +// RequestPayload is the Payload for RequestEventType. +// Carries the already-parsed request — no re-parsing needed. +// StartTime is the only field the producer adds. +type RequestPayload struct { + Request *InferenceRequest + StartTime time.Time +} + +// ResponsePayload is the Payload for ResponseEventType. +// Duration is computed by the producer (time.Since(StartTime)). +// All response body fields are accessible via Response.Body. +type ResponsePayload struct { + Request *InferenceRequest + Response *InferenceResponse + Duration time.Duration +} +``` + +### Extractor definitions + +#### `ConcurrencyExtractor` — owns `"running-requests"` + +| | | +|---|---| +| Handles | `RequestEventType` (increment), `ResponseEventType` (decrement) | +| Reads | `model`, `max_tokens` from `RequestPayload.Request.Body` | +| State | `map[model]*atomic.Int64` — in-flight counters per model | +| Writes | `RunningRequestsCount{Requests int64, Tokens int64}` per model | + +```go +type RunningRequestsCount struct { + Requests int64 + Tokens int64 // sum of max_tokens across in-flight requests +} +``` From 929402d783725d2378fa69094785d479df243d8e Mon Sep 17 00:00:00 2001 From: Mohammad Date: Wed, 13 May 2026 14:28:44 +0300 Subject: [PATCH 2/6] Fix comments. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 42 +++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md index e7f82ca..c41c9e2 100644 --- a/docs/proposals/data-layer-proposal/README.md +++ b/docs/proposals/data-layer-proposal/README.md @@ -32,10 +32,9 @@ flowchart TD PP["Plugin Pipeline\nFilter → Score → Pick"] NS["NotificationSource\nchan Event (buffered)"] TL["tick loop\nevery 100ms"] - ExtA["ConcurrencyExtractor"] + ExtA["RunningRequestsExtractor"] ExtB["LatencyExtractor\n(future)"] - DS1[("DataStore\nrunning-requests")] - DS2[("DataStore\npool-latency")] + DS[("Datastore\npkg/datastore")] MS["Model Selector"] P -->|"Notify(event) ~ns"| NS @@ -43,10 +42,9 @@ flowchart TD NS --> TL TL -->|"[]Event batch"| ExtA TL -->|"[]Event batch"| ExtB - ExtA --> DS1 - ExtB --> DS2 - DS1 --> MS - DS2 --> MS + ExtA --> DS + ExtB --> DS + DS --> MS PP -->|reads| MS ``` @@ -62,6 +60,7 @@ Each extractor switches on `Event.Type` and handles what it understands, ignorin type DataSource interface { Plugin // TypedName() TypedName Start(ctx context.Context) error + // Stop signals the component to shut down and blocks until it has fully stopped. Stop() } @@ -72,17 +71,24 @@ type Event struct { Payload any } +// EventNotifier is the narrow interface the producer uses to fire events. +// Keeping it separate lets the server depend only on Notify, not on lifecycle +// or extractor registration. +type EventNotifier interface { + Notify(e Event) +} + // NotificationSource buffers events off the hot path and dispatches // batches to registered Extractors on each tick. type NotificationSource interface { DataSource - Notify(e Event) // non-blocking, called by the producer + EventNotifier RegisterExtractor(e Extractor) } -// Extractor switches on Event.Type and handles only the types it understands. +// Extractor processes a batch of Events. It does not manage its own goroutines. type Extractor interface { - DataSource + Plugin Extract(ctx context.Context, events []Event) error } ``` @@ -108,9 +114,9 @@ if err := src.Start(ctx); err != nil { ... } ## Implementation Steps -1. Add `DataSource`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework` -2. Implement `NotificationSource` (buffered channel + tick loop) in `pkg/plugins/datalayer/` -3. Implement `ConcurrencyExtractor` +1. Add `DataSource`, `DataStore`, `EventNotifier`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework` +2. Implement `NotificationSource` (buffered channel + tick loop) in `pkg/datalayer/` +3. Implement `RunningRequestsExtractor` in `pkg/plugins/datalayer/` 4. Add `src.Notify(...)` calls to the producer alongside existing pipeline dispatch 5. Wire in `runner.go` @@ -123,14 +129,12 @@ if err := src.Start(ctx); err != nil { ... } ```go // RequestPayload is the Payload for RequestEventType. // Carries the already-parsed request — no re-parsing needed. -// StartTime is the only field the producer adds. type RequestPayload struct { - Request *InferenceRequest - StartTime time.Time + Request *InferenceRequest } // ResponsePayload is the Payload for ResponseEventType. -// Duration is computed by the producer (time.Since(StartTime)). +// Duration is computed by the producer and passed directly. // All response body fields are accessible via Response.Body. type ResponsePayload struct { Request *InferenceRequest @@ -141,12 +145,12 @@ type ResponsePayload struct { ### Extractor definitions -#### `ConcurrencyExtractor` — owns `"running-requests"` +#### `RunningRequestsExtractor` — owns `"running-requests"` | | | |---|---| | Handles | `RequestEventType` (increment), `ResponseEventType` (decrement) | -| Reads | `model`, `max_tokens` from `RequestPayload.Request.Body` | +| Reads | `model`, `max_tokens` from `Request.Body` | | State | `map[model]*atomic.Int64` — in-flight counters per model | | Writes | `RunningRequestsCount{Requests int64, Tokens int64}` per model | From f2a2a44011fd4f6f16c24f1ee1fdc81fa5bc16ce Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sun, 17 May 2026 10:53:54 +0300 Subject: [PATCH 3/6] Update the proposal. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 64 +++++++++++++------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md index c41c9e2..6a68259 100644 --- a/docs/proposals/data-layer-proposal/README.md +++ b/docs/proposals/data-layer-proposal/README.md @@ -1,4 +1,4 @@ -# Proposal: Data Layer +# Proposal: Data Layer ## Summary @@ -31,17 +31,17 @@ flowchart TD P["Producer\n(server.go)"] PP["Plugin Pipeline\nFilter → Score → Pick"] NS["NotificationSource\nchan Event (buffered)"] - TL["tick loop\nevery 100ms"] - ExtA["RunningRequestsExtractor"] + EL["event loop"] + ExtA["InflightRequestsExtractor"] ExtB["LatencyExtractor\n(future)"] DS[("Datastore\npkg/datastore")] - MS["Model Selector"] + MS["Model Selector\n(InflightRequestsScorer)"] P -->|"Notify(event) ~ns"| NS P --> PP - NS --> TL - TL -->|"[]Event batch"| ExtA - TL -->|"[]Event batch"| ExtB + NS --> EL + EL -->|"one event at a time"| ExtA + EL -->|"one event at a time"| ExtB ExtA --> DS ExtB --> DS DS --> MS @@ -50,8 +50,9 @@ flowchart TD The **producer** (currently `server.go`) fires an `Event` on each request and response — a non-blocking channel write (~ns). The `NotificationSource` buffers it. A background -tick loop drains the buffer every 100ms and fans the batch to all registered `Extractor`s. -Each extractor switches on `Event.Type` and handles what it understands, ignoring the rest. +event loop reads each event from the channel as it arrives and fans it out to all registered +`Extractor`s. Each extractor switches on `Event.Type` and handles what it understands, +ignoring the rest. ### Types (`pkg/framework`) @@ -79,10 +80,12 @@ type EventNotifier interface { } // NotificationSource buffers events off the hot path and dispatches -// batches to registered Extractors on each tick. +// each event to registered Extractors as it arrives. type NotificationSource interface { DataSource EventNotifier + // RegisterExtractor adds an extractor after construction. + // Extractors known at construction time can be passed to NewNotificationSource directly. RegisterExtractor(e Extractor) } @@ -95,30 +98,45 @@ type Extractor interface { See [Appendix](#appendix) for payload struct definitions and a full extractor example. +### DataStore injection + +The `DataStore` is passed directly to each extractor's constructor. This keeps the +`NotificationSource` a pure event dispatcher with no knowledge of storage, and avoids +routing the store through `framework.Handle`. + +```go +ds := datastore.NewStore() +extractor := inflightrequests.NewInflightRequestsExtractor(ds) +``` + ### Registration (`runner.go`) ```go -src := datalayer.NewNotificationSource("notification-source") -src.RegisterExtractor(datalayer.NewConcurrencyExtractor(handle)) +ds := datastore.NewStore() +src, err := datalayer.NewNotificationSource("default", inflightrequests.NewInflightRequestsExtractor(ds)) +if err != nil { ... } if err := src.Start(ctx); err != nil { ... } -// pass src to the producer so it can call src.Notify(...) +// TODO: pass src to the producer so it can call src.Notify(...) ``` -**Next:** move extractors registration to a config struct or CLI flags so operators can enable/disable metrics without recompiling. +**Next:** define a configuration story for data layer plugins (NotificationSource, extractors) +consistent with how model-selector plugins are configured via CLI flags. ## Future -- **LatencyExtractor** - handles `ResponseEventType`; per-model avg latency; owns `"pool-latency"` topic +- **LatencyExtractor** - handles `ResponseEventType`; per-model avg latency; owns `"pool-latency"` attribute - **PollingDataSource** - polls inference pool `/metrics` on a ticker; same `Extractor` interface ## Implementation Steps 1. Add `DataSource`, `DataStore`, `EventNotifier`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework` -2. Implement `NotificationSource` (buffered channel + tick loop) in `pkg/datalayer/` -3. Implement `RunningRequestsExtractor` in `pkg/plugins/datalayer/` -4. Add `src.Notify(...)` calls to the producer alongside existing pipeline dispatch -5. Wire in `runner.go` +2. Implement `NotificationSource` (buffered channel + event loop) in `pkg/datalayer/` +3. Implement `InflightRequestsExtractor` in `pkg/plugins/datalayer/inflightrequests/` +4. Implement `InflightRequestsScorer` in `pkg/framework/modelselector/scorer/inflightrequests/` +5. Wire DataStore + extractor + NotificationSource in `runner.go` +6. Add `src.Notify(...)` calls to the producer alongside existing pipeline dispatch +7. Config-driven registration of data layer plugins --- @@ -145,17 +163,17 @@ type ResponsePayload struct { ### Extractor definitions -#### `RunningRequestsExtractor` — owns `"running-requests"` +#### `InflightRequestsExtractor` — owns `"inflight-requests"` (`pkg/plugins/datalayer/inflightrequests`) | | | |---|---| | Handles | `RequestEventType` (increment), `ResponseEventType` (decrement) | | Reads | `model`, `max_tokens` from `Request.Body` | -| State | `map[model]*atomic.Int64` — in-flight counters per model | -| Writes | `RunningRequestsCount{Requests int64, Tokens int64}` per model | +| State | `map[string]InflightRequestsCount` — in-flight counters per model | +| Writes | `InflightRequestsCount{Requests int64, Tokens int64}` per model | ```go -type RunningRequestsCount struct { +type InflightRequestsCount struct { Requests int64 Tokens int64 // sum of max_tokens across in-flight requests } From adcec7bdc307d37942d43a98161fe3e4360437b5 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Mon, 18 May 2026 00:20:50 +0300 Subject: [PATCH 4/6] Fix lint error. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md index 6a68259..3489805 100644 --- a/docs/proposals/data-layer-proposal/README.md +++ b/docs/proposals/data-layer-proposal/README.md @@ -21,7 +21,6 @@ where to route each request. - **Extensible** - adding a new metric or event type must not require changes to existing extractors or the producer. - **Off the plugin pipeline** - the data layer is a background concern; it must not participate in the per-request plugin chain. - ## Proposal ### Architecture @@ -54,7 +53,6 @@ event loop reads each event from the channel as it arrives and fans it out to al `Extractor`s. Each extractor switches on `Event.Type` and handles what it understands, ignoring the rest. - ### Types (`pkg/framework`) ```go @@ -122,7 +120,6 @@ if err := src.Start(ctx); err != nil { ... } **Next:** define a configuration story for data layer plugins (NotificationSource, extractors) consistent with how model-selector plugins are configured via CLI flags. - ## Future - **LatencyExtractor** - handles `ResponseEventType`; per-model avg latency; owns `"pool-latency"` attribute From 90626ecbab77554d4f89e22841db7ce5483fe760 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Mon, 18 May 2026 09:56:07 +0300 Subject: [PATCH 5/6] Update proposal. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 22 +++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md index 3489805..a5a0a3a 100644 --- a/docs/proposals/data-layer-proposal/README.md +++ b/docs/proposals/data-layer-proposal/README.md @@ -53,11 +53,11 @@ event loop reads each event from the channel as it arrives and fans it out to al `Extractor`s. Each extractor switches on `Event.Type` and handles what it understands, ignoring the rest. -### Types (`pkg/framework`) +### Types (`pkg/framework/datalayer/`) ```go type DataSource interface { - Plugin // TypedName() TypedName + framework.Plugin // TypedName() TypedName Start(ctx context.Context) error // Stop signals the component to shut down and blocks until it has fully stopped. Stop() @@ -83,13 +83,13 @@ type NotificationSource interface { DataSource EventNotifier // RegisterExtractor adds an extractor after construction. - // Extractors known at construction time can be passed to NewNotificationSource directly. + // Extractors known at construction time can be passed to New directly. RegisterExtractor(e Extractor) } // Extractor processes a batch of Events. It does not manage its own goroutines. type Extractor interface { - Plugin + framework.Plugin Extract(ctx context.Context, events []Event) error } ``` @@ -111,7 +111,7 @@ extractor := inflightrequests.NewInflightRequestsExtractor(ds) ```go ds := datastore.NewStore() -src, err := datalayer.NewNotificationSource("default", inflightrequests.NewInflightRequestsExtractor(ds)) +src, err := notificationsource.New("default", inflightrequests.NewInflightRequestsExtractor(ds)) if err != nil { ... } if err := src.Start(ctx); err != nil { ... } // TODO: pass src to the producer so it can call src.Notify(...) @@ -127,8 +127,8 @@ consistent with how model-selector plugins are configured via CLI flags. ## Implementation Steps -1. Add `DataSource`, `DataStore`, `EventNotifier`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework` -2. Implement `NotificationSource` (buffered channel + event loop) in `pkg/datalayer/` +1. Add `DataSource`, `DataStore`, `EventNotifier`, `Event`, `NotificationSource`, `Extractor`, payload types to `pkg/framework/datalayer/` +2. Implement `NotificationSource` (buffered channel + event loop) in `pkg/plugins/datalayer/notificationsource/` 3. Implement `InflightRequestsExtractor` in `pkg/plugins/datalayer/inflightrequests/` 4. Implement `InflightRequestsScorer` in `pkg/framework/modelselector/scorer/inflightrequests/` 5. Wire DataStore + extractor + NotificationSource in `runner.go` @@ -142,18 +142,20 @@ consistent with how model-selector plugins are configured via CLI flags. ### Payload types ```go +// package datalayer (pkg/framework/datalayer/) + // RequestPayload is the Payload for RequestEventType. // Carries the already-parsed request — no re-parsing needed. type RequestPayload struct { - Request *InferenceRequest + Request *framework.InferenceRequest } // ResponsePayload is the Payload for ResponseEventType. // Duration is computed by the producer and passed directly. // All response body fields are accessible via Response.Body. type ResponsePayload struct { - Request *InferenceRequest - Response *InferenceResponse + Request *framework.InferenceRequest + Response *framework.InferenceResponse Duration time.Duration } ``` From 6d804fa69321bdc629fb8b4f62dc41fc3e8c0488 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Mon, 18 May 2026 20:00:04 +0300 Subject: [PATCH 6/6] Align with datastore changes. Signed-off-by: Mohammad --- docs/proposals/data-layer-proposal/README.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/proposals/data-layer-proposal/README.md b/docs/proposals/data-layer-proposal/README.md index a5a0a3a..4ad51db 100644 --- a/docs/proposals/data-layer-proposal/README.md +++ b/docs/proposals/data-layer-proposal/README.md @@ -99,18 +99,20 @@ See [Appendix](#appendix) for payload struct definitions and a full extractor ex ### DataStore injection The `DataStore` is passed directly to each extractor's constructor. This keeps the -`NotificationSource` a pure event dispatcher with no knowledge of storage, and avoids -routing the store through `framework.Handle`. +`NotificationSource` a pure event dispatcher with no knowledge of storage, and avoids routing the store through `framework.Handle`. + +The concrete implementation lives in `pkg/datastore/inmemory`, separate from the +`pkg/datastore.Datastore` interface, to make room for future backends (e.g. Redis): ```go -ds := datastore.NewStore() +ds := inmemory.NewDatastore() extractor := inflightrequests.NewInflightRequestsExtractor(ds) ``` ### Registration (`runner.go`) ```go -ds := datastore.NewStore() +ds := inmemory.NewDatastore() src, err := notificationsource.New("default", inflightrequests.NewInflightRequestsExtractor(ds)) if err != nil { ... } if err := src.Start(ctx); err != nil { ... }