Skip to content

Commit 110f04b

Browse files
sbalabanovsbalabanov-zzclaude
authored
feat(pusher): add Pusher extension and wire it into the merge controller (#151)
## Summary - New `extension/pusher` interface with all-or-nothing `Push` semantics, per-change outcomes (committed vs already-existed), and an `ErrConflict` sentinel for user-caused failures. - Git-backed implementation that fetches → resets → cherry-picks → pushes, serialized via mutex on the shared checkout. Detects redundant picks and rolls back genuinely empty commits so they don't reach the remote. - Concurrent-push contention is detected by re-checking the remote tip after a push failure (rather than parsing git error output, which varies across versions and rejection sources). The full cycle is retried up to `Params.MaxPushAttempts` (default 10). - Merge controller now calls `Pusher.Push`, transitions the batch to `Succeeded`/`Failed`, and nacks transient infra errors. Three outcome cases are inlined (success, `ErrConflict` → Failed, generic error → return) — retryability classification will live in separate infra. - `example/server/orchestrator` wires a git pusher from `PUSHER_CHECKOUT_PATH` / `PUSHER_REMOTE` / `PUSHER_TARGET` env vars. ## Test plan - [x] `make test` — 30 unit tests pass, including new pusher and rewritten merge controller suites - [x] Real-git fixture tests for the pusher cover single/stacked URIs, already-existed, mixed-partial, conflict, recovery-after-conflict, reset-between-calls, retry-on-contention, and giveup-after-cap - [x] Pre-receive race hook drives the retry loop end-to-end with real git mechanics (unsets `GIT_QUARANTINE_PATH`/`GIT_OBJECT_DIRECTORY`/`GIT_ALTERNATE_OBJECT_DIRECTORIES` to mutate refs from inside pre-receive) - [x] `make fmt`, `make gazelle`, `make tidy` — tree clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: sergeyb <sergeyb@uber.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7123378 commit 110f04b

14 files changed

Lines changed: 1679 additions & 112 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ local-stop: ## Stop all services (keep data)
245245

246246
mocks: ## Generate mock files using mockgen
247247
@echo "Generating mocks..."
248-
@$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/...
248+
@$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/...
249249
@echo "Mocks generated successfully!"
250250

251251
proto: ## Generate protobuf files from .proto definitions

example/server/orchestrator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ go_library(
2020
"//extension/counter/mysql",
2121
"//extension/mergechecker",
2222
"//extension/mergechecker/github",
23+
"//extension/pusher",
24+
"//extension/pusher/git",
2325
"//extension/queue",
2426
"//extension/queue/mysql",
2527
"//extension/scorer/heuristic",

example/server/orchestrator/main.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
4040
"github.com/uber/submitqueue/extension/mergechecker"
4141
githubchecker "github.com/uber/submitqueue/extension/mergechecker/github"
42+
"github.com/uber/submitqueue/extension/pusher"
43+
gitpusher "github.com/uber/submitqueue/extension/pusher/git"
4244
extqueue "github.com/uber/submitqueue/extension/queue"
4345
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
4446
"github.com/uber/submitqueue/extension/scorer/heuristic"
@@ -203,8 +205,14 @@ func run() error {
203205
return fmt.Errorf("failed to create change provider: %w", err)
204206
}
205207

208+
// Create pusher
209+
psh, err := newPusher(logger, scope)
210+
if err != nil {
211+
return fmt.Errorf("failed to create pusher: %w", err)
212+
}
213+
206214
// Register controllers
207-
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, cnt, store); err != nil {
215+
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store); err != nil {
208216
return err
209217
}
210218

@@ -389,7 +397,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
389397
// │ │ │
390398
// └────────┴────────────────────────┘
391399

392-
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, cnt counter.Counter, store storage.Storage) error {
400+
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage) error {
393401
requestController := start.NewController(
394402
logger,
395403
scope,
@@ -495,6 +503,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
495503
scope,
496504
store,
497505
registry,
506+
psh,
498507
consumer.TopicKeyMerge,
499508
"orchestrator-merge",
500509
)
@@ -595,3 +604,21 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch
595604
MetricsScope: scope.SubScope("changeprovider"),
596605
}), nil
597606
}
607+
608+
// newPusher creates a git-backed Pusher bound to the configured checkout
609+
// path, remote, and target branch. Configured via PUSHER_CHECKOUT_PATH
610+
// (required), PUSHER_REMOTE (default "origin"), and PUSHER_TARGET (default
611+
// "main").
612+
func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) {
613+
checkout := os.Getenv("PUSHER_CHECKOUT_PATH")
614+
if checkout == "" {
615+
return nil, fmt.Errorf("PUSHER_CHECKOUT_PATH environment variable is required")
616+
}
617+
return gitpusher.NewPusher(gitpusher.Params{
618+
CheckoutPath: checkout,
619+
Remote: getEnv("PUSHER_REMOTE", "origin"),
620+
Target: getEnv("PUSHER_TARGET", "main"),
621+
Logger: logger.Sugar(),
622+
MetricsScope: scope.SubScope("pusher"),
623+
}), nil
624+
}

extension/pusher/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
load("@rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "pusher",
5+
srcs = ["pusher.go"],
6+
importpath = "github.com/uber/submitqueue/extension/pusher",
7+
visibility = ["//visibility:public"],
8+
deps = ["//entity"],
9+
)

extension/pusher/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Pusher
2+
3+
Pluggable abstraction for landing a list of `entity.Change` values onto a
4+
target branch and pushing the result to a source-control remote.
5+
6+
## Interface
7+
8+
`Pusher` exposes a single `Push` method that accepts a list of changes.
9+
Implementations are bound to a specific `(checkout, remote, target)` tuple
10+
at construction time, so the interface itself stays vendor- and
11+
configuration-agnostic.
12+
13+
The interface enforces an **all-or-nothing atomicity contract**: when `Push`
14+
returns an error, no change has reached the remote — neither partially nor
15+
fully. Callers can treat a non-nil error as "the remote is exactly as it was
16+
before the call". The `ErrConflict` sentinel marks user-caused failures so
17+
callers can route them to a non-retry path.
18+
19+
A successful `Push` returns one `ChangeOutcome` per input change in input
20+
order. Each outcome reports either:
21+
22+
- `OutcomeStatusCommitted` with the list of `CommitSHAs` produced on the
23+
target branch (one change can land as multiple commits, e.g. a stack of
24+
PRs); or
25+
- `OutcomeStatusAlreadyExisted` with no commits, when the change is already
26+
present on the target branch (previously landed via another path, or
27+
subsumed by an earlier change in the same push). Git surfaces this as
28+
"rebased out" during a cherry-pick.
29+
30+
## Implementations
31+
32+
- [`git/`](git/) — applies changes against a local checkout via `git
33+
cherry-pick`, then `git push`. Construction takes the path to the
34+
checkout, the remote name, and the target branch; the implementation
35+
owns that working tree and serializes concurrent invocations.
36+
37+
## Adding a new backend
38+
39+
1. Create `extension/pusher/{backend}/` with a `Pusher` implementation.
40+
2. Bind the implementation to its checkout/remote/target at construction.
41+
3. Map each `entity.Change` to the backend's commit/push primitives.
42+
4. Honour the atomicity contract: never publish partial state. Return
43+
`ErrConflict` (wrapped) for user-caused apply failures and a plain error
44+
for transient infra failures.

extension/pusher/git/BUILD.bazel

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "git",
5+
srcs = ["git_pusher.go"],
6+
importpath = "github.com/uber/submitqueue/extension/pusher/git",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//core/metrics",
10+
"//entity",
11+
"//entity/github",
12+
"//extension/pusher",
13+
"@com_github_uber_go_tally_v4//:tally",
14+
"@org_uber_go_zap//:zap",
15+
],
16+
)
17+
18+
go_test(
19+
name = "git_test",
20+
srcs = ["git_pusher_test.go"],
21+
embed = [":git"],
22+
deps = [
23+
"//entity",
24+
"//extension/pusher",
25+
"@com_github_stretchr_testify//assert",
26+
"@com_github_stretchr_testify//require",
27+
"@com_github_uber_go_tally_v4//:tally",
28+
"@org_uber_go_zap//zaptest",
29+
],
30+
)

0 commit comments

Comments
 (0)