Skip to content

Conversation

@agatons
Copy link
Contributor

@agatons agatons commented Jan 27, 2026

No description provided.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces per-function parallelism to the task scheduling system, allowing specific functions to have dedicated worker pools and to be excluded from the general scheduler, plus tests to validate this behavior.

Changes:

  • Refactors workers to accept explicit task and completion channels, enabling separate queues per scheduler.
  • Adds function-specific scheduler infrastructure (functionParallelism, functionScheduler, claimFilter, claimTasksWithFilter, and filtered timeToSleep) and wires it into Init, startScheduler, and shutdown.
  • Adds FunctionParallelism option and a new per_function_parallelism_test to verify per-function concurrency limits.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
worker.go General worker now operates on injected task/completion channels so it can be reused by both general and function-specific schedulers.
dao.go Adds claimFilter, claimTasksWithFilter, and a filtered timeToSleep to support function include/exclude semantics in task claiming and sleep-time calculation.
docket.go Extends docket with function-parallelism fields, adjusts Init to compute connection pool size and start per-function schedulers/workers, and updates Close to coordinate shutdown of all schedulers.
scheduler.go Updates the main scheduler to exclude functions with specific parallelism and implements startFunctionScheduler for function-specific scheduling loops.
option.go Adds a FunctionParallelism option to configure per-function worker limits at initialization.
tests/per_function_parallelism_test.go Introduces a new test docket and functions to assert that Slow/Medium/Fast functions respect their configured concurrency limits.
tests/main_test.go Integrates PerFunctionParallelismInit into the global test setup and ensures the per-function docket is closed at teardown.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

docket.go Outdated
d.claimedTasks = make(chan task, d.parallelism)
d.taskCompleted = make(chan bool, d.parallelism*2)
d.functions = make(map[string]TaskFunction)
d.functionParallelism = make(map[string]int)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.functionParallelism is being reinitialized to a new empty map here, which discards any values set by FunctionParallelism options earlier in Init. As a result, per-function parallelism configuration is ignored: the general scheduler’s exclude list in startScheduler is always empty and the loop starting functionSchedulers below never runs. To preserve the configured values, avoid overwriting d.functionParallelism here (only initialize it once before applying options, or conditionally if it is still nil).

Suggested change
d.functionParallelism = make(map[string]int)
if d.functionParallelism == nil {
d.functionParallelism = make(map[string]int)
}

Copilot uses AI. Check for mistakes.
Comment on lines +408 to 452
if len(filter.onlyFunctions) > 0 {
placeholders := make([]string, len(filter.onlyFunctions))
var args []interface{}
args = append(args, d.maxClaimCount)
argNum := 2
for i, fn := range filter.onlyFunctions {
placeholders[i] = fmt.Sprintf("$%d", argNum)
args = append(args, fn)
argNum++
}
funcFilter := fmt.Sprintf("AND func IN (%s)", strings.Join(placeholders, ", "))
q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1)
err := d.db.QueryRow(q, args...).Scan(&seconds)
if err != nil && errors.Is(err, sql.ErrNoRows) {
return time.Duration(math.MaxInt64)
}
if err != nil {
d.logger.Load().With("error", err).Warn("error getting time to sleep")
return time.Duration(math.MaxInt64)
}
} else if len(filter.excludeFunctions) > 0 {
placeholders := make([]string, len(filter.excludeFunctions))
var args []interface{}
args = append(args, d.maxClaimCount)
argNum := 2
for i, fn := range filter.excludeFunctions {
placeholders[i] = fmt.Sprintf("$%d", argNum)
args = append(args, fn)
argNum++
}
funcFilter := fmt.Sprintf("AND func NOT IN (%s)", strings.Join(placeholders, ", "))
q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1)
err := d.db.QueryRow(q, args...).Scan(&seconds)
if err != nil && errors.Is(err, sql.ErrNoRows) {
return time.Duration(math.MaxInt64)
}
if err != nil {
d.logger.Load().With("error", err).Warn("error getting time to sleep")
return time.Duration(math.MaxInt64)
}
} else {
q = strings.Replace(q, "--FUNCTION_FILTER--", "", 1)
}
err := d.db.QueryRow(q, d.maxClaimCount).Scan(&seconds)
if err != nil && errors.Is(err, sql.ErrNoRows) {
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When filter.onlyFunctions or filter.excludeFunctions is non-empty, this function executes a filtered QueryRow call with a dynamically built args slice, but then still falls through to the unconditional QueryRow(q, d.maxClaimCount) below. At that point q still contains the function filter placeholders, so the single d.maxClaimCount argument does not match the number of placeholders, leading to a runtime error and also issuing two queries instead of one. This should either execute a single parametrized query in all branches (by building a shared args slice and calling QueryRow once) or return early in the filtered branches after a successful scan.

Copilot uses AI. Check for mistakes.
agatons and others added 2 commits January 27, 2026 14:11
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
}
funcFilter := fmt.Sprintf("AND func IN (%s)", strings.Join(placeholders, ", "))
q = strings.Replace(q, "--FUNCTION_FILTER--", funcFilter, 1)
err := d.db.QueryRow(q, args...).Scan(&seconds)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Varför är båda dessa inne i if elsen är det något jag inte fattar?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants