-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmany.go
More file actions
121 lines (100 loc) · 2.67 KB
/
many.go
File metadata and controls
121 lines (100 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package task
import (
"context"
"errors"
)
// Next creates a task that runs next after t finished successfully.
func (t Task) Then(next Task) Task {
return func(ctx context.Context) error {
if err := t.Run(ctx); err != nil {
return err
}
return next.Run(ctx)
}
}
// Iter creates a task run tasks with same context and stops at first error.
func Iter(tasks ...Task) Task {
return func(ctx context.Context) error {
for _, t := range tasks {
if err := t.Run(ctx); err != nil {
return err
}
}
return nil
}
}
var ErrOneHasDone = errors.New("another task has been done")
// First creates a task that runs tasks concurrently, return first result and cancel
// others. Other tasks receives ErrOneHasDone as cancel cause.
//
// Take care of [NoCtx] and [NoErr] tasks as it cannot be cancelled by context.
func First(tasks ...Task) Task {
return func(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
ch := make(chan error)
for _, t := range tasks {
t.GoWithChan(ctx, ch)
}
err = <-ch
cancel(ErrOneHasDone)
go func() {
for i := 1; i < len(tasks); i++ {
<-ch
}
}()
return
}
}
// Wait creates a task that runs all task concurrently, wait them get done, and
// return first non-nil error.
func Wait(tasks ...Task) Task {
return func(ctx context.Context) (err error) {
ch := make(chan error)
for _, t := range tasks {
t.GoWithChan(ctx, ch)
}
for range tasks {
e := <-ch
if err == nil && e != nil {
err = e
}
}
return
}
}
type ErrOthers struct {
cause error
}
func (e ErrOthers) Error() string {
return "canceled by error from other task: " + e.cause.Error()
}
func (e ErrOthers) Unwrap() error { return e.cause }
func (e ErrOthers) Is(err error) bool { return errors.Is(err, e.cause) }
func (e ErrOthers) As(v interface{}) bool { return errors.As(e.cause, v) }
// Skip creates a task that runs tasks concurrently, cancel others if any error, and
// wait them done.
//
// Tasks canceled by this receieves ErrOthers which wraps the error as cancel cause.
//
// Take care of [NoCtx] and [NoErr] tasks as it cannot be cancelled by context.
func Skip(tasks ...Task) Task {
return func(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
ch := make(chan error)
for _, t := range tasks {
t.GoWithChan(ctx, ch)
}
for range tasks {
e := <-ch
if err == nil && e != nil {
err = e
cancel(ErrOthers{e})
}
}
return
}
}