Skip to content

Commit 7f2a754

Browse files
committed
refactor(scorer): score batches over typed change details
## Summary ### Why? The scorer took entity.Change (just URIs), so it could not score on real change size — the example heuristic counted URIs as a placeholder. With typed change details now persisted on change records, the scorer can score a batch on its actual lines/files changed. ### What? - Add entity.BatchChanges — the normalized, batch-level view of all changes in a batch (BatchID, Queue, []ChangeInfo) with aggregation helpers. - Scorer.Score now takes entity.BatchChanges; the heuristic ValueFunc and the composite scorer operate over it. - The score controller resolves each request's change records, flattens their details into BatchChanges, and scores the batch once — replacing the per-request multiplicative product over len(URIs). - Example wiring buckets by total lines changed. Consumes the typed details persisted by the change-details change. ## Test Plan - ✅ `make build`, `make test`, `make lint`, `make check-mocks/gazelle/tidy` - ✅ `make integration-test`, `make e2e-test` (start -> validate enrich -> score normalizes the batch and scores on real change size)
1 parent 2f34df7 commit 7f2a754

11 files changed

Lines changed: 163 additions & 53 deletions

File tree

example/submitqueue/orchestrator/server/main.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,16 +515,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
515515
logger,
516516
scope,
517517
store,
518-
// TODO: replace with a real scorer
518+
// Heuristic scorer: bucket the batch by total lines changed across all of
519+
// its changes — larger batches are likelier to fail to land.
519520
scorerFactory{impl: heuristic.New(
520521
[]heuristic.Bucket{
521-
{Min: 0, Max: 1, Score: 0.95},
522-
{Min: 2, Max: 5, Score: 0.80},
523-
{Min: 6, Max: 20, Score: 0.60},
524-
{Min: 21, Max: 1<<31 - 1, Score: 0.40},
522+
{Min: 0, Max: 50, Score: 0.95},
523+
{Min: 51, Max: 250, Score: 0.80},
524+
{Min: 251, Max: 1000, Score: 0.60},
525+
{Min: 1001, Max: 1<<31 - 1, Score: 0.40},
525526
},
526-
func(_ context.Context, change entity.Change) (int, error) {
527-
return len(change.URIs), nil
527+
func(_ context.Context, changes entity.BatchChanges) (int, error) {
528+
return changes.TotalLinesChanged(), nil
528529
},
529530
scope.SubScope("scorer"),
530531
)},

submitqueue/entity/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "entity",
55
srcs = [
66
"batch.go",
7+
"batch_changes.go",
78
"batch_dependent.go",
89
"build.go",
910
"cancel_request.go",
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
// BatchChanges is the normalized, batch-level view of all changes in a batch,
18+
// assembled by the score controller and handed to a Scorer. A Batch references
19+
// only request IDs, so the controller resolves each request's change records and
20+
// flattens their details into Changes — giving the scorer the whole batch's
21+
// change facts in one value without coupling it to storage.
22+
type BatchChanges struct {
23+
// BatchID is the batch being scored. Format: "<queue>/batch/<counter_value>".
24+
BatchID string
25+
// Queue is the queue the batch belongs to.
26+
Queue string
27+
// Changes is every change (URI + provider-supplied details) across all requests
28+
// in the batch. Order is unspecified.
29+
Changes []ChangeInfo
30+
}
31+
32+
// TotalLinesChanged returns the total number of lines touched across every change in the batch.
33+
func (b BatchChanges) TotalLinesChanged() int {
34+
total := 0
35+
for _, c := range b.Changes {
36+
total += c.Details.TotalLinesChanged()
37+
}
38+
return total
39+
}
40+
41+
// TotalFiles returns the total number of files touched across every change in the batch.
42+
func (b BatchChanges) TotalFiles() int {
43+
total := 0
44+
for _, c := range b.Changes {
45+
total += c.Details.FileCount()
46+
}
47+
return total
48+
}

submitqueue/extension/scorer/composite/scorer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,13 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope)
9090

9191
// Score evaluates all child scorers and combines their results using the reduce function.
9292
// If any child scorer returns an error, that error is returned immediately.
93-
func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
93+
func (c *compositeScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
9494
op := metrics.Begin(c.scope, "score")
9595
defer func() { op.Complete(retErr) }()
9696

9797
scores := make(map[string]float64, len(c.scorers))
9898
for name, s := range c.scorers {
99-
score, err := s.Score(ctx, change)
99+
score, err := s.Score(ctx, changes)
100100
if err != nil {
101101
return 0, err
102102
}

submitqueue/extension/scorer/composite/scorer_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ type fixedScorer struct {
3131
score float64
3232
}
3333

34-
func (f *fixedScorer) Score(_ context.Context, _ entity.Change) (float64, error) {
34+
func (f *fixedScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
3535
return f.score, nil
3636
}
3737

3838
// errorScorer always returns an error.
3939
type errorScorer struct{}
4040

41-
func (e *errorScorer) Score(_ context.Context, _ entity.Change) (float64, error) {
41+
func (e *errorScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
4242
return 0, fmt.Errorf("scorer failed")
4343
}
4444

@@ -99,7 +99,7 @@ func TestScorer_Score(t *testing.T) {
9999
for _, tt := range tests {
100100
t.Run(tt.name, func(t *testing.T) {
101101
s := New(tt.scorers, tt.reduce, tally.NoopScope)
102-
got, err := s.Score(context.Background(), entity.Change{})
102+
got, err := s.Score(context.Background(), entity.BatchChanges{})
103103
require.NoError(t, err)
104104
assert.InDelta(t, tt.want, got, 1e-9)
105105
})
@@ -111,7 +111,7 @@ func TestScorer_Score_ChildError(t *testing.T) {
111111
"error": &errorScorer{},
112112
"files": &fixedScorer{0.9},
113113
}, Min, tally.NoopScope)
114-
_, err := s.Score(context.Background(), entity.Change{})
114+
_, err := s.Score(context.Background(), entity.BatchChanges{})
115115
require.Error(t, err)
116116
}
117117

@@ -140,7 +140,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) {
140140
"files": &fixedScorer{0.9},
141141
"deps": &fixedScorer{0.95},
142142
}, custom, tally.NoopScope)
143-
got, err := s.Score(context.Background(), entity.Change{})
143+
got, err := s.Score(context.Background(), entity.BatchChanges{})
144144
require.NoError(t, err)
145145
assert.Equal(t, 0.9, got)
146146
assert.ElementsMatch(t, []string{"files", "deps"}, receivedNames)

submitqueue/extension/scorer/heuristic/scorer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/uber/submitqueue/submitqueue/extension/scorer"
2525
)
2626

27-
// ValueFunc extracts a single numeric value from a Change for bucketing.
28-
type ValueFunc func(context.Context, entity.Change) (int, error)
27+
// ValueFunc extracts a single numeric value from a batch of changes for bucketing.
28+
type ValueFunc func(context.Context, entity.BatchChanges) (int, error)
2929

3030
// Bucket defines a range [Min, Max] mapped to a probability Score.
3131
type Bucket struct {
@@ -37,12 +37,12 @@ type Bucket struct {
3737
Score float64
3838
}
3939

40-
// heuristicScorer computes a success probability by bucketing a metric extracted from a Change.
40+
// heuristicScorer computes a success probability by bucketing a metric extracted from a batch of changes.
4141
// It follows the Java HeuristicsBasedSuccessPredictor pattern.
4242
type heuristicScorer struct {
4343
// buckets is the list of ranges to match against.
4444
buckets []Bucket
45-
// valueFunc extracts the numeric value from a Change.
45+
// valueFunc extracts the numeric value from a batch of changes.
4646
valueFunc ValueFunc
4747
// scope is the tally scope for emitting metrics.
4848
scope tally.Scope
@@ -61,12 +61,12 @@ func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer
6161
}
6262
}
6363

64-
// Score extracts the value from the change, then returns the probability score for the first
65-
// bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
66-
func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
64+
// Score extracts the value from the batch of changes, then returns the probability score for the
65+
// first bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
66+
func (s *heuristicScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
6767
op := metrics.Begin(s.scope, "score")
6868
defer func() { op.Complete(retErr) }()
69-
value, err := s.valueFunc(ctx, change)
69+
value, err := s.valueFunc(ctx, changes)
7070
if err != nil {
7171
return 0, err
7272
}

submitqueue/extension/scorer/heuristic/scorer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
// staticValue returns a ValueFunc that always returns the given value.
2828
func staticValue(value int) ValueFunc {
29-
return func(_ context.Context, _ entity.Change) (int, error) {
29+
return func(_ context.Context, _ entity.BatchChanges) (int, error) {
3030
return value, nil
3131
}
3232
}
@@ -107,7 +107,7 @@ func TestScorer_Score(t *testing.T) {
107107
for _, tt := range tests {
108108
t.Run(tt.name, func(t *testing.T) {
109109
s := New(tt.buckets, tt.valueFunc, tally.NoopScope)
110-
got, err := s.Score(context.Background(), entity.Change{})
110+
got, err := s.Score(context.Background(), entity.BatchChanges{})
111111
if tt.wantErr {
112112
require.Error(t, err)
113113
return
@@ -119,11 +119,11 @@ func TestScorer_Score(t *testing.T) {
119119
}
120120

121121
func TestScorer_Score_ValueFuncError(t *testing.T) {
122-
failing := func(_ context.Context, _ entity.Change) (int, error) {
122+
failing := func(_ context.Context, _ entity.BatchChanges) (int, error) {
123123
return 0, assert.AnError
124124
}
125125
s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope)
126-
_, err := s.Score(context.Background(), entity.Change{})
126+
_, err := s.Score(context.Background(), entity.BatchChanges{})
127127
require.Error(t, err)
128128
}
129129

submitqueue/extension/scorer/mock/scorer_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

submitqueue/extension/scorer/scorer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import (
2222
"github.com/uber/submitqueue/submitqueue/entity"
2323
)
2424

25-
// Scorer computes a success probability score for a change based on its characteristics.
25+
// Scorer computes a success probability score for a batch of changes based on their characteristics.
2626
type Scorer interface {
2727
// Score returns a probability between 0.0 and 1.0 indicating the likelihood
28-
// of a successful land for the given change.
29-
Score(ctx context.Context, change entity.Change) (float64, error)
28+
// of a successful land for the given batch of changes.
29+
Score(ctx context.Context, changes entity.BatchChanges) (float64, error)
3030
}
3131

3232
// Config carries the per-queue identity handed to a Factory. The system knows

submitqueue/orchestrator/controller/score/score.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,27 +173,54 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
173173
return nil // Success - message will be acked
174174
}
175175

176-
// scoreBatch scores each request's change in the batch and returns the combined probability.
177-
// Uses multiplicative probability: if any single request fails, the entire batch fails,
178-
// so the batch score is the product of individual request scores.
176+
// scoreBatch normalizes the batch's changes and scores them as a whole. It resolves
177+
// each request in the batch, reads that request's change records (one per URI), and
178+
// flattens their provider-supplied details into a single entity.BatchChanges, which
179+
// the scorer turns into one probability for the batch.
179180
func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float64, error) {
180181
sc, err := c.scorers.For(scorer.Config{QueueName: batch.Queue})
181182
if err != nil {
182183
return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err)
183184
}
184-
score := 1.0
185+
186+
changes, err := c.collectBatchChanges(ctx, batch)
187+
if err != nil {
188+
return 0, err
189+
}
190+
191+
score, err := sc.Score(ctx, changes)
192+
if err != nil {
193+
return 0, fmt.Errorf("failed to score batch %s: %w", batch.ID, err)
194+
}
195+
return score, nil
196+
}
197+
198+
// collectBatchChanges assembles the normalized entity.BatchChanges for a batch by
199+
// resolving each request and reading its change records per URI. For each URI it
200+
// selects the record owned by the request (GetByURI returns rows for all requests
201+
// that ever claimed the URI) and appends its URI + details.
202+
func (c *Controller) collectBatchChanges(ctx context.Context, batch entity.Batch) (entity.BatchChanges, error) {
203+
changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue}
185204
for _, requestID := range batch.Contains {
186205
request, err := c.store.GetRequestStore().Get(ctx, requestID)
187206
if err != nil {
188-
return 0, fmt.Errorf("failed to get request %s: %w", requestID, err)
207+
return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err)
189208
}
190-
s, err := sc.Score(ctx, request.Change)
191-
if err != nil {
192-
return 0, fmt.Errorf("failed to score request %s: %w", requestID, err)
209+
for _, uri := range request.Change.URIs {
210+
records, err := c.store.GetChangeStore().GetByURI(ctx, batch.Queue, uri)
211+
if err != nil {
212+
return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err)
213+
}
214+
for _, rec := range records {
215+
if rec.RequestID != requestID {
216+
continue
217+
}
218+
changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details})
219+
break
220+
}
193221
}
194-
score *= s
195222
}
196-
return score, nil
223+
return changes, nil
197224
}
198225

199226
// publish publishes a batch ID to the specified topic key.

0 commit comments

Comments
 (0)