-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkers.go
More file actions
96 lines (88 loc) · 1.9 KB
/
workers.go
File metadata and controls
96 lines (88 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main
import (
"runtime"
"sync"
"time"
bar "github.com/schollz/progressbar/v3"
)
type Worker struct {
ch chan func()
wg sync.WaitGroup
dog chan struct{}
dogWait sync.WaitGroup
bar *bar.ProgressBar
}
// NewWorker creates a worker pool with an optional
// watchdog timeout. If the workers stall for the timeout period,
// the watchdog triggers. If count and name are set, a progress bar is created.
func NewWorker(d time.Duration, name string, count int) *Worker {
// Create a set of workers that listen on a channel and
// call a function.
w := &Worker{}
if count != 0 && name != "" {
w.bar = bar.Default(int64(count), name)
}
workers := runtime.NumCPU()
w.ch = make(chan func(), workers)
if d != 0 {
w.dog = make(chan struct{}, 10)
w.dogWait.Add(1)
go w.watchdog(d)
}
w.wg.Add(workers)
for range workers {
go w.worker()
}
return w
}
// Wait shuts down the worker pool by closing the channel and
// waits for the workers to finish.
func (w *Worker) Wait() {
close(w.ch)
w.wg.Wait()
if w.dog != nil {
close(w.dog)
w.dogWait.Wait()
}
if w.bar != nil {
w.bar.Finish()
}
}
// Execute a function on one of the workers.
// If a watchdog is enabled, send a keepalive.
func (w *Worker) Run(f func()) {
if w.dog != nil {
w.dog <- struct{}{}
}
w.ch <- f
}
// worker listens for a function to dispatch and then calls it.
// When the channel closes, exit.
func (w *Worker) worker() {
defer w.wg.Done()
for f := range w.ch {
f()
if w.bar != nil {
w.bar.Add(1)
}
}
}
// watchdog starts a timer and watches for
// keepalives and the timer expiry (which logs a fatal message).
func (w *Worker) watchdog(t time.Duration) {
defer w.dogWait.Done()
ticker := time.NewTicker(t)
for {
select {
case <-ticker.C:
panic("Watchdog timeout!")
case _, ok := <-w.dog:
if !ok {
// Watchdog shutdown.
ticker.Stop()
return
}
ticker.Reset(t)
}
}
}