@@ -6,16 +6,16 @@ import (
66 "runtime"
77 "time"
88
9- "github.com/DoNewsCode/core/events"
10- "github.com/DoNewsCode/core/otredis"
11-
129 "github.com/DoNewsCode/core/config"
1310 "github.com/DoNewsCode/core/contract"
1411 "github.com/DoNewsCode/core/di"
12+ "github.com/DoNewsCode/core/events"
13+ "github.com/DoNewsCode/core/otredis"
1514 "github.com/go-kit/kit/log"
1615 "github.com/go-kit/kit/log/level"
1716 "github.com/go-kit/kit/metrics"
1817 "github.com/oklog/run"
18+ "github.com/pkg/errors"
1919)
2020
2121/*
@@ -36,8 +36,17 @@ DispatcherMaker, the JobDispatcher and the exported configs.
3636 JobDispatcher
3737 *Queue
3838*/
39- func Providers () di.Deps {
40- return []interface {}{provideDispatcherFactory , provideConfig , provideDispatcher }
39+ func Providers (optionFunc ... ProvidersOptionFunc ) di.Deps {
40+ option := & providersOption {}
41+ for _ , f := range optionFunc {
42+ f (option )
43+ }
44+ return []interface {}{
45+ provideDispatcherFactory (option ),
46+ provideConfig ,
47+ provideDispatcher ,
48+ di .Bind (new (DispatcherFactory ), new (DispatcherMaker )),
49+ }
4150}
4251
4352// Gauge is an alias used for dependency injection
@@ -62,21 +71,17 @@ type makerIn struct {
6271 Conf contract.ConfigAccessor
6372 JobDispatcher JobDispatcher `optional:"true"`
6473 EventDispatcher contract.Dispatcher `optional:"true"`
65- Driver Driver `optional:"true"`
66- RedisMaker otredis.Maker `optional:"true"`
6774 Logger log.Logger
6875 AppName contract.AppName
6976 Env contract.Env
70- Gauge Gauge `optional:"true"`
77+ Gauge Gauge `optional:"true"`
78+ Populator contract.DIPopulator `optional:"true"`
7179}
7280
7381// makerOut is the di output JobFrom provideDispatcherFactory
7482type makerOut struct {
7583 di.Out
76-
77- DispatcherMaker DispatcherMaker
7884 DispatcherFactory DispatcherFactory
79- ExportedConfig []config.ExportedConfig `group:"config,flatten"`
8085}
8186
8287func (d makerOut ) ModuleSentinel () {}
@@ -85,86 +90,81 @@ func (m makerOut) Module() interface{} { return m }
8590
8691// provideDispatcherFactory is a provider for *DispatcherFactory and *Queue.
8792// It also provides an interface for each.
88- func provideDispatcherFactory (p makerIn ) (makerOut , error ) {
89- var (
90- err error
91- queueConfs map [string ]configuration
92- )
93- err = p .Conf .Unmarshal ("queue" , & queueConfs )
94- if err != nil {
95- level .Warn (p .Logger ).Log ("err" , err )
93+ func provideDispatcherFactory (option * providersOption ) func (p makerIn ) (makerOut , error ) {
94+ if option .driverConstructor == nil {
95+ option .driverConstructor = newDefaultDriver
9696 }
97- factory := di . NewFactory ( func (name string ) (di. Pair , error ) {
97+ return func (p makerIn ) (makerOut , error ) {
9898 var (
99- ok bool
100- conf configuration
99+ err error
100+ queueConfs map [ string ] configuration
101101 )
102- p := p
103- if conf , ok = queueConfs [name ]; ! ok {
104- if name != "default" {
105- return di.Pair {}, fmt .Errorf ("queue configuration %s not found" , name )
106- }
107- conf = configuration {Parallelism : runtime .NumCPU (), CheckQueueLengthIntervalSecond : 0 }
108- }
109-
110- if p .JobDispatcher == nil {
111- p .JobDispatcher = & SyncDispatcher {}
112- }
113- if p .EventDispatcher == nil {
114- p .EventDispatcher = & events.SyncDispatcher {}
115- }
116-
117- if p .Gauge != nil {
118- p .Gauge = p .Gauge .With ("queue" , name )
102+ err = p .Conf .Unmarshal ("queue" , & queueConfs )
103+ if err != nil {
104+ level .Warn (p .Logger ).Log ("err" , err )
119105 }
106+ factory := di .NewFactory (func (name string ) (di.Pair , error ) {
107+ var (
108+ ok bool
109+ conf configuration
110+ )
111+ p := p
112+ if conf , ok = queueConfs [name ]; ! ok {
113+ if name != "default" {
114+ return di.Pair {}, fmt .Errorf ("queue configuration %s not found" , name )
115+ }
116+ conf = configuration {Parallelism : runtime .NumCPU (), CheckQueueLengthIntervalSecond : 0 }
117+ }
120118
121- if p .Driver == nil {
122- if p .RedisMaker == nil {
123- return di.Pair {}, fmt .Errorf ("default redis client not found, please provide it or provide a queue.Driver" )
119+ if p .JobDispatcher == nil {
120+ p .JobDispatcher = & SyncDispatcher {}
124121 }
125- if conf . RedisName == "" {
126- conf . RedisName = "default"
122+ if p . EventDispatcher == nil {
123+ p . EventDispatcher = & events. SyncDispatcher {}
127124 }
128- redisClient , err := p . RedisMaker . Make ( conf . RedisName )
129- if err != nil {
130- return di. Pair {}, fmt . Errorf ( "failed to initiate redis driver: %w " , err )
125+
126+ if p . Gauge != nil {
127+ p . Gauge = p . Gauge . With ( "queue " , name )
131128 }
132- p .Driver = & RedisDriver {
133- Logger : p .Logger ,
134- RedisClient : redisClient ,
135- ChannelConfig : ChannelConfig {
136- Delayed : fmt .Sprintf ("{%s:%s:%s}:delayed" , p .AppName .String (), p .Env .String (), name ),
137- Failed : fmt .Sprintf ("{%s:%s:%s}:failed" , p .AppName .String (), p .Env .String (), name ),
138- Reserved : fmt .Sprintf ("{%s:%s:%s}:reserved" , p .AppName .String (), p .Env .String (), name ),
139- Waiting : fmt .Sprintf ("{%s:%s:%s}:waiting" , p .AppName .String (), p .Env .String (), name ),
140- Timeout : fmt .Sprintf ("{%s:%s:%s}:timeout" , p .AppName .String (), p .Env .String (), name ),
141- },
129+
130+ var driver = option .driver
131+ if option .driver == nil {
132+ driver , err = option .driverConstructor (DriverConstructorArgs {
133+ Name : "name" ,
134+ Conf : conf ,
135+ Logger : p .Logger ,
136+ AppName : p .AppName ,
137+ Env : p .Env ,
138+ Populator : p .Populator ,
139+ })
140+ if err != nil {
141+ return di.Pair {}, err
142+ }
142143 }
144+ queuedDispatcher := NewQueue (
145+ driver ,
146+ UseLogger (p .Logger ),
147+ UseParallelism (conf .Parallelism ),
148+ UseGauge (p .Gauge , time .Duration (conf .CheckQueueLengthIntervalSecond )* time .Second ),
149+ UseJobDispatcher (p .JobDispatcher ),
150+ UseEventDispatcher (p .EventDispatcher ),
151+ )
152+ return di.Pair {
153+ Closer : nil ,
154+ Conn : queuedDispatcher ,
155+ }, nil
156+ })
157+
158+ // Queue must be created eagerly, so that the consumer goroutines can start on boot up.
159+ for name := range queueConfs {
160+ factory .Make (name )
143161 }
144- queuedDispatcher := NewQueue (
145- p .Driver ,
146- UseLogger (p .Logger ),
147- UseParallelism (conf .Parallelism ),
148- UseGauge (p .Gauge , time .Duration (conf .CheckQueueLengthIntervalSecond )* time .Second ),
149- UseJobDispatcher (p .JobDispatcher ),
150- UseEventDispatcher (p .EventDispatcher ),
151- )
152- return di.Pair {
153- Closer : nil ,
154- Conn : queuedDispatcher ,
155- }, nil
156- })
157162
158- // Queue must be created eagerly, so that the consumer goroutines can start on boot up.
159- for name := range queueConfs {
160- factory .Make (name )
163+ dispatcherFactory := DispatcherFactory {Factory : factory }
164+ return makerOut {
165+ DispatcherFactory : dispatcherFactory ,
166+ }, nil
161167 }
162-
163- dispatcherFactory := DispatcherFactory {Factory : factory }
164- return makerOut {
165- DispatcherFactory : dispatcherFactory ,
166- DispatcherMaker : dispatcherFactory ,
167- }, nil
168168}
169169
170170// ProvideRunGroup implements container.RunProvider.
@@ -184,6 +184,31 @@ func (d makerOut) ProvideRunGroup(group *run.Group) {
184184 }
185185}
186186
187+ func newDefaultDriver (args DriverConstructorArgs ) (Driver , error ) {
188+ var maker otredis.Maker
189+ if args .Populator == nil {
190+ return nil , errors .New ("the default driver requires setting the populator in DI container" )
191+ }
192+ if err := args .Populator .Populate (& maker ); err != nil {
193+ return nil , fmt .Errorf ("the default driver requires an otredis.Maker in DI container: %w" , err )
194+ }
195+ client , err := maker .Make (args .Conf .RedisName )
196+ if err != nil {
197+ return nil , fmt .Errorf ("the default driver requires the redis client called %s: %w" , args .Conf .RedisName , err )
198+ }
199+ return & RedisDriver {
200+ Logger : args .Logger ,
201+ RedisClient : client ,
202+ ChannelConfig : ChannelConfig {
203+ Delayed : fmt .Sprintf ("{%s:%s:%s}:delayed" , args .AppName .String (), args .Env .String (), args .Name ),
204+ Failed : fmt .Sprintf ("{%s:%s:%s}:failed" , args .AppName .String (), args .Env .String (), args .Name ),
205+ Reserved : fmt .Sprintf ("{%s:%s:%s}:reserved" , args .AppName .String (), args .Env .String (), args .Name ),
206+ Waiting : fmt .Sprintf ("{%s:%s:%s}:waiting" , args .AppName .String (), args .Env .String (), args .Name ),
207+ Timeout : fmt .Sprintf ("{%s:%s:%s}:timeout" , args .AppName .String (), args .Env .String (), args .Name ),
208+ },
209+ }, nil
210+ }
211+
187212type dispatcherOut struct {
188213 di.Out
189214
0 commit comments