-
Notifications
You must be signed in to change notification settings - Fork 0
function specific parallelism #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces per-function parallelism to the task scheduling system, allowing specific functions to have dedicated worker pools and to be excluded from the general scheduler, plus tests to validate this behavior.
Changes:
- Refactors workers to accept explicit task and completion channels, enabling separate queues per scheduler.
- Adds function-specific scheduler infrastructure (
functionParallelism,functionScheduler,claimFilter,claimTasksWithFilter, and filteredtimeToSleep) and wires it intoInit,startScheduler, and shutdown. - Adds
FunctionParallelismoption and a newper_function_parallelism_testto verify per-function concurrency limits.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
worker.go |
General worker now operates on injected task/completion channels so it can be reused by both general and function-specific schedulers. |
dao.go |
Adds claimFilter, claimTasksWithFilter, and a filtered timeToSleep to support function include/exclude semantics in task claiming and sleep-time calculation. |
docket.go |
Extends docket with function-parallelism fields, adjusts Init to compute connection pool size and start per-function schedulers/workers, and updates Close to coordinate shutdown of all schedulers. |
scheduler.go |
Updates the main scheduler to exclude functions with specific parallelism and implements startFunctionScheduler for function-specific scheduling loops. |
option.go |
Adds a FunctionParallelism option to configure per-function worker limits at initialization. |
tests/per_function_parallelism_test.go |
Introduces a new test docket and functions to assert that Slow/Medium/Fast functions respect their configured concurrency limits. |
tests/main_test.go |
Integrates PerFunctionParallelismInit into the global test setup and ensures the per-function docket is closed at teardown. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
docket.go
Outdated
| d.claimedTasks = make(chan task, d.parallelism) | ||
| d.taskCompleted = make(chan bool, d.parallelism*2) | ||
| d.functions = make(map[string]TaskFunction) | ||
| d.functionParallelism = make(map[string]int) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d.functionParallelism is being reinitialized to a new empty map here, which discards any values set by FunctionParallelism options earlier in Init. As a result, per-function parallelism configuration is ignored: the general scheduler’s exclude list in startScheduler is always empty and the loop starting functionSchedulers below never runs. To preserve the configured values, avoid overwriting d.functionParallelism here (only initialize it once before applying options, or conditionally if it is still nil).
| d.functionParallelism = make(map[string]int) | |
| if d.functionParallelism == nil { | |
| d.functionParallelism = make(map[string]int) | |
| } |
| if len(filter.onlyFunctions) > 0 { | ||
| placeholders := make([]string, len(filter.onlyFunctions)) | ||
| var args []interface{} | ||
| args = append(args, d.maxClaimCount) | ||
| argNum := 2 | ||
| for i, fn := range filter.onlyFunctions { | ||
| placeholders[i] = fmt.Sprintf("$%d", argNum) | ||
| args = append(args, fn) | ||
| argNum++ | ||
| } | ||
| funcFilter := fmt.Sprintf("AND func IN (%s)", strings.Join(placeholders, ", ")) | ||
| q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1) | ||
| err := d.db.QueryRow(q, args...).Scan(&seconds) | ||
| if err != nil && errors.Is(err, sql.ErrNoRows) { | ||
| return time.Duration(math.MaxInt64) | ||
| } | ||
| if err != nil { | ||
| d.logger.Load().With("error", err).Warn("error getting time to sleep") | ||
| return time.Duration(math.MaxInt64) | ||
| } | ||
| } else if len(filter.excludeFunctions) > 0 { | ||
| placeholders := make([]string, len(filter.excludeFunctions)) | ||
| var args []interface{} | ||
| args = append(args, d.maxClaimCount) | ||
| argNum := 2 | ||
| for i, fn := range filter.excludeFunctions { | ||
| placeholders[i] = fmt.Sprintf("$%d", argNum) | ||
| args = append(args, fn) | ||
| argNum++ | ||
| } | ||
| funcFilter := fmt.Sprintf("AND func NOT IN (%s)", strings.Join(placeholders, ", ")) | ||
| q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1) | ||
| err := d.db.QueryRow(q, args...).Scan(&seconds) | ||
| if err != nil && errors.Is(err, sql.ErrNoRows) { | ||
| return time.Duration(math.MaxInt64) | ||
| } | ||
| if err != nil { | ||
| d.logger.Load().With("error", err).Warn("error getting time to sleep") | ||
| return time.Duration(math.MaxInt64) | ||
| } | ||
| } else { | ||
| q = strings.Replace(q, "--FUNCTION_FILTER--", "", 1) | ||
| } | ||
| err := d.db.QueryRow(q, d.maxClaimCount).Scan(&seconds) | ||
| if err != nil && errors.Is(err, sql.ErrNoRows) { |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When filter.onlyFunctions or filter.excludeFunctions is non-empty, this function executes a filtered QueryRow call with a dynamically built args slice, but then still falls through to the unconditional QueryRow(q, d.maxClaimCount) below. At that point q still contains the function filter placeholders, so the single d.maxClaimCount argument does not match the number of placeholders, leading to a runtime error and also issuing two queries instead of one. This should either execute a single parametrized query in all branches (by building a shared args slice and calling QueryRow once) or return early in the filtered branches after a successful scan.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| } | ||
| funcFilter := fmt.Sprintf("AND func IN (%s)", strings.Join(placeholders, ", ")) | ||
| q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1) | ||
| err := d.db.QueryRow(q, args...).Scan(&seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Varför är båda dessa inne i if elsen är det något jag inte fattar?
No description provided.