-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmanager.go
More file actions
204 lines (181 loc) · 6.67 KB
/
manager.go
File metadata and controls
204 lines (181 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package taskqueue
import (
"context"
"errors"
"log"
"sync"
"time"
)
const DefaultNumOfWorkers int = 1
var ErrEmptyTaskName = errors.New("task name must not be empty")
// ManagerOption is a function type that modifies the configuration of a ManagerConfig.
// It allows for functional configuration of the Manager, enabling users to customize
// various settings, such as the backoff policy, when creating a Manager.
type ManagerOption func(*ManagerConfig)
// ManagerConfig holds configuration options for the Manager.
type ManagerConfig struct {
BackoffPolicy Backoff
}
// Manager is responsible for managing the lifecycle of workers,
// coordinating task consumption and processing, and gracefully stopping workers.
type Manager struct {
broker Broker
backoff Backoff
workers []Worker
ctx context.Context // The context used for managing the lifecycle of the Manager.
cancel context.CancelFunc // The cancel function associated with the context, used to stop the Manager.
wg sync.WaitGroup // A WaitGroup to wait for all workers to finish their tasks.
}
// WithBackoffPolicy is a functional option that sets the BackoffPolicy for a Manager.
// It allows the user to specify a custom backoff policy for retrying failed tasks in the Manager.
//
// Example usage:
//
// manager := NewManager(bp, numOfWorkers, WithBackoffPolicy(customBackoffPolicy))
func WithBackoffPolicy(bp Backoff) ManagerOption {
return func(cfg *ManagerConfig) {
cfg.BackoffPolicy = bp
}
}
// NewManager creates a new Manager instance responsible for coordinating and supervising workers.
//
// Parameters:
// - broker: The Broker used by all workers to fetch and process tasks.
// - wf: A WorkerFactory function used to create each Worker with a provided WorkerConfig.
// - numWorkers: The number of workers to spawn. If set to 0 or less, DefaultNumOfWorkers is used.
// - opts: Optional functional configuration parameters for customizing Manager behavior (e.g. backoff policy).
//
// The Manager sets up a cancellable context for controlling the lifecycle of its workers,
// and assigns a shared WaitGroup to coordinate shutdowns. If no BackoffPolicy is provided in
// the options, a DefaultBackoffPolicy is used.
//
// Each worker is created using the WorkerFactory and given a unique ID, shared broker,
// backoff policy, and reference to the Manager's WaitGroup.
//
// Returns:
//
// A pointer to a fully initialized Manager ready to register task handlers and start processing.
//
// Example:
//
// mgr := NewManager(broker, MyWorkerFactory(), 5, WithBackoff(customBackoff))
// mgr.RegisterTask("send_email", emailHandler)
// mgr.Start()
func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...ManagerOption) *Manager {
ctx, cancel := context.WithCancel(context.Background())
if numWorkers <= 0 {
numWorkers = DefaultNumOfWorkers
}
managerConfig := &ManagerConfig{}
for _, opt := range opts {
opt(managerConfig)
}
if managerConfig.BackoffPolicy == nil {
managerConfig.BackoffPolicy = &DefaultBackoffPolicy
}
manager := &Manager{
broker: broker,
backoff: managerConfig.BackoffPolicy,
ctx: ctx,
cancel: cancel,
}
for i := range numWorkers {
manager.wg.Add(1)
workerConfig := WorkerConfig{
ID: i + 1,
Broker: manager.broker,
Backoff: manager.backoff,
WG: &manager.wg,
}
manager.workers = append(manager.workers, wf(workerConfig))
}
return manager
}
// RegisterTask registers a task handler for the specified task name across all workers managed by the Manager.
// This allows each worker to handle tasks of the specified name by executing the provided handler function.
//
// The handler function must have the signature: func(TaskArgs) error. If the task name is already registered,
// the handler will overwrite the existing one.
//
// Parameters:
// - taskName: The name of the task to register. This name is used to identify tasks in the queue.
// - handler: The function that handles the task when it is consumed by the worker.
//
// Example usage:
//
// manager.RegisterTask("send_email", func(args TaskArgs) error {
// // Handle the task
// return nil
// })
func (m *Manager) RegisterTask(taskName string, handler TaskHandlerFunc) {
for _, w := range m.workers {
w.Register(taskName, handler)
}
}
// PublishTask publishes a task to the broker with the specified task name, arguments, and maximum retries.
//
// This method creates a new task with the provided parameters and publishes it to the broker for consumption by workers.
//
// Parameters:
// - taskName: The name of the task to publish. This is used to identify the task in the queue.
// - args: The arguments that will be passed to the task handler. These are passed as a map of key-value pairs.
// - maxRetry: The maximum number of retries allowed for the task in case of failure. If the task fails more than
// the specified number of times, it will not be retried again.
//
// Returns:
// - An error if the task name is empty or there is an issue publishing the task to the broker. If no errors occur,
// the method will return nil.
//
// Example usage:
//
// err := manager.PublishTask("send_email", TaskArgs{"email": "user@example.com"}, 3)
// if err != nil {
// log.Printf("Error publishing task: %v", err)
// }
func (m *Manager) PublishTask(taskName string, args TaskArgs, maxRetry uint) error {
if taskName == "" {
return ErrEmptyTaskName
}
task := Task{
Name: taskName,
Args: args,
MaxRetry: maxRetry,
Timestamp: time.Now().UTC(),
}
return m.broker.Publish(task)
}
// Start starts all the workers managed by the Manager in separate goroutines.
//
// This method launches each worker's `Start` method concurrently, passing the Manager's context to each worker.
// The workers will begin consuming tasks from the broker and processing them based on the registered task handlers.
//
// Example usage:
//
// manager.Start() // Starts all workers concurrently
func (m *Manager) Start() {
for _, w := range m.workers {
go w.Start(m.ctx)
}
}
// Stop stops the Manager and all its workers gracefully.
//
// This method cancels the context associated with the Manager, signaling all workers to stop their work.
// It then waits for all workers to complete their shutdown process using the WaitGroup.
//
// It is important to call `Stop` to ensure that all workers have finished their tasks and the system shuts down cleanly.
//
// Example usage:
//
// manager.Stop() // Stops all workers and waits for them to finish
func (m *Manager) Stop() {
log.Println("Manager stopping...")
m.cancel()
m.wg.Wait()
log.Println("All workers have shut down.")
}
func (m *Manager) Workers() []Worker {
return m.workers
}
func (m *Manager) BackoffPolicy() Backoff {
return m.backoff
}