@@ -27,8 +27,23 @@ import (
2727)
2828
2929// Controller handles speculate queue messages.
30- // It consumes batches, performs speculation, and publishes to both build and merge stages.
31- // Implements consumer.Controller interface for integration with the consumer.
30+ //
31+ // Naive happy-path algorithm: assume every in-flight build will pass and
32+ // treat batch.Dependencies + [batch.ID] as the single speculation chain.
33+ // Per invocation, the controller advances the batch one step in the
34+ // state machine:
35+ //
36+ // - Created or Scored → publish to build, transition to Speculating.
37+ // - Speculating → if all deps are Succeeded, publish to merge and
38+ // transition to Merging; otherwise no-op (or fail-fast if a dep is
39+ // in a non-succeeding terminal state).
40+ // - Merging → no-op (owned by the merge controller).
41+ // - Terminal → re-fan-out to conclude for self-healing in case a
42+ // prior publish was lost.
43+ //
44+ // The controller is re-triggered on every relevant downstream event
45+ // (buildsignal, merge), so each call simply re-evaluates the current
46+ // state and either advances or waits.
3247type Controller struct {
3348 logger * zap.SugaredLogger
3449 metricsScope tally.Scope
@@ -60,68 +75,181 @@ func NewController(
6075 }
6176}
6277
63- // Process processes a speculate delivery from the queue.
64- // Deserializes the batch, performs speculation, and publishes to both build and merge topics.
78+ // Process advances a batch one step along the naive happy-path.
6579// Returns nil to ack (success), or error to nack (retry).
6680func (c * Controller ) Process (ctx context.Context , delivery consumer.Delivery ) error {
6781 c .metricsScope .Counter ("received" ).Inc (1 )
6882
6983 msg := delivery .Message ()
7084
71- // Deserialize batch ID from payload
7285 bid , err := entity .BatchIDFromBytes (msg .Payload )
7386 if err != nil {
7487 c .metricsScope .Counter ("deserialize_errors" ).Inc (1 )
7588 return fmt .Errorf ("failed to deserialize batch ID: %w" , err )
7689 }
7790
78- // Fetch batch from storage
7991 batch , err := c .store .GetBatchStore ().Get (ctx , bid .ID )
8092 if err != nil {
8193 c .metricsScope .Counter ("storage_errors" ).Inc (1 )
8294 return fmt .Errorf ("failed to get batch %s: %w" , bid .ID , err )
8395 }
8496
85- c .logger .Infow ("received speculate event" ,
97+ // Terminal state: re-fan-out to conclude for self-healing. The batch is
98+ // already done; if a previous publish was lost, downstream stages will
99+ // otherwise never reconcile. Re-publishing is safe because conclude is
100+ // idempotent on the batch ID.
101+ if batch .State .IsTerminal () {
102+ c .metricsScope .Counter ("self_heal_terminal" ).Inc (1 )
103+ return c .fanout (ctx , batch .ID , batch .Queue )
104+ }
105+
106+ // Merging is owned by the merge controller, which has its own self-heal.
107+ if batch .State == entity .BatchStateMerging {
108+ c .metricsScope .Counter ("noop_merging" ).Inc (1 )
109+ return nil
110+ }
111+
112+ switch batch .State {
113+ case entity .BatchStateCreated , entity .BatchStateScored :
114+ return c .startSpeculation (ctx , batch )
115+ case entity .BatchStateSpeculating :
116+ return c .tryFinalize (ctx , batch )
117+ default :
118+ c .metricsScope .Counter ("unexpected_state" ).Inc (1 )
119+ return fmt .Errorf ("unexpected batch state %q for batch %s" , batch .State , batch .ID )
120+ }
121+ }
122+
123+ // startSpeculation kicks off CI for this batch on top of the speculative head
124+ // (batch.Dependencies assumed to all pass), then transitions to Speculating.
125+ func (c * Controller ) startSpeculation (ctx context.Context , batch entity.Batch ) error {
126+ c .logger .Infow ("starting speculation" ,
86127 "batch_id" , batch .ID ,
87- "queue" , batch .Queue ,
88- "state" , string (batch .State ),
89- "version" , batch .Version ,
90- "attempt" , delivery .Attempt (),
91- "partition_key" , msg .PartitionKey ,
128+ "speculation_chain" , append (append ([]string {}, batch .Dependencies ... ), batch .ID ),
92129 )
93130
94- // TODO: Add speculation logic
95- // - Speculative merge/rebase
96- // - Conflict detection
97- // - Publish to build only if speculation is in progress (needs CI verification)
98- // - Publish to merge only if speculation is complete and successful (ready to land)
99-
100- // Publish to build topic
101131 if err := c .publish (ctx , consumer .TopicKeyBuild , batch .ID , batch .Queue ); err != nil {
102132 c .metricsScope .Counter ("publish_errors" ).Inc (1 )
103133 return fmt .Errorf ("failed to publish to build: %w" , err )
104134 }
105135
106- c .logger .Infow ("published batch to build" ,
107- "batch_id" , batch .ID ,
108- "topic_key" , consumer .TopicKeyBuild ,
109- )
136+ // Optimistic CAS: if the version has already advanced (concurrent speculate),
137+ // the next event will see the new state and behave correctly.
138+ newVersion := batch .Version + 1
139+ if err := c .store .GetBatchStore ().UpdateState (ctx , batch .ID , batch .Version , newVersion , entity .BatchStateSpeculating ); err != nil {
140+ c .metricsScope .Counter ("storage_errors" ).Inc (1 )
141+ return fmt .Errorf ("failed to update batch %s state to speculating: %w" , batch .ID , err )
142+ }
143+
144+ c .metricsScope .Counter ("started_speculation" ).Inc (1 )
145+ return nil
146+ }
147+
148+ // tryFinalize publishes to merge and transitions to Merging iff every
149+ // dependency batch has reached Succeeded. If any dep is Failed/Cancelled,
150+ // the batch cannot land on top of it; we mark it Failed and hand off to
151+ // conclude so the request state and log are reconciled. Otherwise (some
152+ // deps still in flight) it no-ops and waits for the next event.
153+ //
154+ // TODO: when a dependency fails we currently fail this batch outright.
155+ // We will need to respeculate the failed paths — drop the failed dep
156+ // from the chain and re-issue speculation for the surviving ordering(s)
157+ // — instead of cascading the failure into requests that could still land.
158+ func (c * Controller ) tryFinalize (ctx context.Context , batch entity.Batch ) error {
159+ deps , err := c .fetchDependencies (ctx , batch )
160+ if err != nil {
161+ return err
162+ }
163+
164+ pending := make ([]string , 0 , len (deps ))
165+ for _ , d := range deps {
166+ switch d .State {
167+ case entity .BatchStateSucceeded :
168+ // ok
169+ case entity .BatchStateFailed , entity .BatchStateCancelled :
170+ return c .failOnDependency (ctx , batch , d )
171+ default :
172+ pending = append (pending , d .ID )
173+ }
174+ }
175+
176+ if len (pending ) > 0 {
177+ c .metricsScope .Counter ("waiting_on_deps" ).Inc (1 )
178+ c .logger .Debugw ("dependencies still in flight; waiting" ,
179+ "batch_id" , batch .ID ,
180+ "pending_dependency_ids" , pending ,
181+ )
182+ return nil
183+ }
110184
111- // Publish to merge topic
112185 if err := c .publish (ctx , consumer .TopicKeyMerge , batch .ID , batch .Queue ); err != nil {
113186 c .metricsScope .Counter ("publish_errors" ).Inc (1 )
114187 return fmt .Errorf ("failed to publish to merge: %w" , err )
115188 }
116189
117- c .logger .Infow ("published batch to merge" ,
190+ newVersion := batch .Version + 1
191+ if err := c .store .GetBatchStore ().UpdateState (ctx , batch .ID , batch .Version , newVersion , entity .BatchStateMerging ); err != nil {
192+ c .metricsScope .Counter ("storage_errors" ).Inc (1 )
193+ return fmt .Errorf ("failed to update batch %s state to merging: %w" , batch .ID , err )
194+ }
195+
196+ c .metricsScope .Counter ("processed" ).Inc (1 )
197+ return nil
198+ }
199+
200+ // failOnDependency transitions a Speculating batch to Failed when one of its
201+ // dependencies has reached a non-succeeding terminal state, then publishes to
202+ // the conclude queue so the request store and request log get reconciled.
203+ // Without this transition the batch would sit in Speculating forever — no
204+ // downstream event ever fires for it again.
205+ func (c * Controller ) failOnDependency (ctx context.Context , batch entity.Batch , dep entity.Batch ) error {
206+ c .metricsScope .Counter ("dependency_failed" ).Inc (1 )
207+ c .logger .Warnw ("dependency in non-succeeding terminal state; failing batch" ,
118208 "batch_id" , batch .ID ,
119- "topic_key" , consumer .TopicKeyMerge ,
209+ "dependency_id" , dep .ID ,
210+ "dependency_state" , string (dep .State ),
120211 )
121212
122- c .metricsScope .Counter ("processed" ).Inc (1 )
213+ newVersion := batch .Version + 1
214+ if err := c .store .GetBatchStore ().UpdateState (ctx , batch .ID , batch .Version , newVersion , entity .BatchStateFailed ); err != nil {
215+ c .metricsScope .Counter ("storage_errors" ).Inc (1 )
216+ return fmt .Errorf ("failed to update batch %s state to failed: %w" , batch .ID , err )
217+ }
123218
124- return nil // Success - message will be acked
219+ if err := c .publish (ctx , consumer .TopicKeyConclude , batch .ID , batch .Queue ); err != nil {
220+ c .metricsScope .Counter ("publish_errors" ).Inc (1 )
221+ return fmt .Errorf ("failed to publish to conclude: %w" , err )
222+ }
223+
224+ return nil
225+ }
226+
227+ // fetchDependencies loads each batch in batch.Dependencies. Any storage error
228+ // is surfaced as a retryable infra failure; missing dependencies should not
229+ // happen in practice, but if one does it is treated the same as a transient
230+ // fetch failure (i.e. the message is retried).
231+ func (c * Controller ) fetchDependencies (ctx context.Context , batch entity.Batch ) ([]entity.Batch , error ) {
232+ deps := make ([]entity.Batch , 0 , len (batch .Dependencies ))
233+ for _ , depID := range batch .Dependencies {
234+ d , err := c .store .GetBatchStore ().Get (ctx , depID )
235+ if err != nil {
236+ c .metricsScope .Counter ("dependency_fetch_errors" ).Inc (1 )
237+ return nil , fmt .Errorf ("failed to get dependency batch %s of %s: %w" , depID , batch .ID , err )
238+ }
239+ deps = append (deps , d )
240+ }
241+ return deps , nil
242+ }
243+
244+ // fanout re-publishes downstream events for a batch that has already reached
245+ // a terminal state. Used for self-healing when a previous publish was lost:
246+ // re-sending to conclude guarantees request-state reconciliation.
247+ func (c * Controller ) fanout (ctx context.Context , batchID , partitionKey string ) error {
248+ if err := c .publish (ctx , consumer .TopicKeyConclude , batchID , partitionKey ); err != nil {
249+ c .metricsScope .Counter ("publish_errors" ).Inc (1 )
250+ return fmt .Errorf ("failed to publish to conclude: %w" , err )
251+ }
252+ return nil
125253}
126254
127255// publish publishes a batch ID to the specified topic key.
0 commit comments