Skip to content

Commit b0ff8fd

Browse files
authored
chore(worker): remove disable consumer flag (#16)
1 parent d1c23a8 commit b0ff8fd

File tree

3 files changed

+61
-71
lines changed

3 files changed

+61
-71
lines changed

nsq.go

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ var _ core.Worker = (*Worker)(nil)
1919
type Worker struct {
2020
q *nsq.Consumer
2121
p *nsq.Producer
22+
cfg *nsq.Config
2223
stopOnce sync.Once
24+
startOnce sync.Once
2325
stop chan struct{}
2426
stopFlag int32
25-
startFlag int32
2627
opts Options
2728
tasks chan *nsq.Message
2829
}
@@ -35,73 +36,64 @@ func NewWorker(opts ...Option) *Worker {
3536
tasks: make(chan *nsq.Message),
3637
}
3738

38-
cfg := nsq.NewConfig()
39-
cfg.MaxInFlight = w.opts.maxInFlight
39+
w.cfg = nsq.NewConfig()
40+
w.cfg.MaxInFlight = w.opts.maxInFlight
4041

41-
if err := w.startProducer(cfg); err != nil {
42-
panic(err)
43-
}
44-
45-
if err := w.startConsumer(cfg); err != nil {
42+
if err := w.startProducer(); err != nil {
4643
panic(err)
4744
}
4845

4946
return w
5047
}
5148

52-
func (w *Worker) startProducer(cfg *nsq.Config) error {
49+
func (w *Worker) startProducer() error {
5350
var err error
5451

55-
w.p, err = nsq.NewProducer(w.opts.addr, cfg)
52+
w.p, err = nsq.NewProducer(w.opts.addr, w.cfg)
5653

5754
return err
5855
}
5956

60-
func (w *Worker) startConsumer(cfg *nsq.Config) error {
61-
if w.opts.disableConsumer {
62-
return nil
63-
}
64-
65-
var err error
66-
67-
w.q, err = nsq.NewConsumer(w.opts.topic, w.opts.channel, cfg)
68-
if err != nil {
69-
return err
70-
}
71-
72-
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
73-
if len(msg.Body) == 0 {
74-
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
75-
// In this case, a message with an empty body is simply ignored/discarded.
76-
return nil
57+
func (w *Worker) startConsumer() (err error) {
58+
w.startOnce.Do(func() {
59+
w.q, err = nsq.NewConsumer(w.opts.topic, w.opts.channel, w.cfg)
60+
if err != nil {
61+
return
7762
}
7863

79-
loop:
80-
for {
81-
select {
82-
case w.tasks <- msg:
83-
break loop
84-
case <-w.stop:
85-
if msg != nil {
86-
// re-queue the job if worker has been shutdown.
87-
msg.Requeue(-1)
64+
w.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
65+
if len(msg.Body) == 0 {
66+
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
67+
// In this case, a message with an empty body is simply ignored/discarded.
68+
return nil
69+
}
70+
71+
loop:
72+
for {
73+
select {
74+
case w.tasks <- msg:
75+
break loop
76+
case <-w.stop:
77+
if msg != nil {
78+
// re-queue the job if worker has been shutdown.
79+
msg.Requeue(-1)
80+
}
81+
break loop
82+
case <-time.After(2 * time.Second):
83+
msg.Touch()
8884
}
89-
break loop
90-
case <-time.After(2 * time.Second):
91-
msg.Touch()
9285
}
93-
}
9486

95-
return nil
96-
}))
87+
return nil
88+
}))
9789

98-
err = w.q.ConnectToNSQD(w.opts.addr)
99-
if err != nil {
100-
return err
101-
}
90+
err = w.q.ConnectToNSQD(w.opts.addr)
91+
if err != nil {
92+
return
93+
}
94+
})
10295

103-
atomic.CompareAndSwapInt32(&w.startFlag, 0, 1)
104-
return nil
96+
return err
10597
}
10698

10799
func (w *Worker) handle(job queue.Job) error {
@@ -172,12 +164,13 @@ func (w *Worker) Shutdown() error {
172164
// notify shtdown event to worker and consumer
173165
close(w.stop)
174166
// stop producer and consumer
175-
if atomic.LoadInt32(&w.startFlag) == 1 {
167+
if w.q != nil {
176168
w.q.ChangeMaxInFlight(0)
177169
w.q.Stop()
178170
<-w.q.StopChan
179-
w.p.Stop()
180171
}
172+
w.p.Stop()
173+
181174
// close task channel
182175
close(w.tasks)
183176
})
@@ -195,6 +188,10 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
195188

196189
// Request fetch new task from queue
197190
func (w *Worker) Request() (core.QueuedMessage, error) {
191+
if err := w.startConsumer(); err != nil {
192+
return nil, err
193+
}
194+
198195
clock := 0
199196
loop:
200197
for {

nsq_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,6 @@ func TestHandleTimeout(t *testing.T) {
277277
time.Sleep(200 * time.Millisecond)
278278
return nil
279279
}),
280-
WithDisableConsumer(),
281280
)
282281

283282
err := w.handle(job)
@@ -294,7 +293,6 @@ func TestHandleTimeout(t *testing.T) {
294293
time.Sleep(200 * time.Millisecond)
295294
return nil
296295
}),
297-
WithDisableConsumer(),
298296
)
299297

300298
done := make(chan error)
@@ -318,7 +316,6 @@ func TestJobComplete(t *testing.T) {
318316
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
319317
return errors.New("job completed")
320318
}),
321-
WithDisableConsumer(),
322319
)
323320

324321
err := w.handle(job)
@@ -369,6 +366,7 @@ func TestNSQStatsinQueue(t *testing.T) {
369366
assert.NoError(t, q.Queue(m))
370367
assert.NoError(t, q.Queue(m))
371368
q.Start()
369+
time.Sleep(200 * time.Millisecond)
372370
assert.Equal(t, int(1), w.Stats().Connections)
373371
time.Sleep(500 * time.Millisecond)
374372
assert.Equal(t, uint64(2), w.Stats().MessagesReceived)
@@ -390,14 +388,17 @@ func TestNSQStatsInWorker(t *testing.T) {
390388
assert.NoError(t, w.Queue(m))
391389
assert.NoError(t, w.Queue(m))
392390
assert.NoError(t, w.Queue(m))
393-
assert.Equal(t, int(1), w.Stats().Connections)
391+
assert.Nil(t, w.Stats())
394392

395-
time.Sleep(50 * time.Millisecond)
393+
task, err := w.Request()
394+
assert.Equal(t, int(1), w.Stats().Connections)
395+
assert.NotNil(t, task)
396+
assert.NoError(t, err)
396397

397398
assert.Equal(t, uint64(1), w.Stats().MessagesReceived)
398-
assert.Equal(t, uint64(0), w.Stats().MessagesFinished)
399+
assert.Equal(t, uint64(1), w.Stats().MessagesFinished)
399400
assert.Equal(t, uint64(0), w.Stats().MessagesRequeued)
400-
401+
time.Sleep(50 * time.Millisecond)
401402
_ = w.Shutdown()
402403
time.Sleep(50 * time.Millisecond)
403404
assert.Equal(t, uint64(1), w.Stats().MessagesRequeued)

options.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ func (f OptionFunc) Apply(option *Options) {
2121
}
2222

2323
type Options struct {
24-
maxInFlight int
25-
addr string
26-
topic string
27-
channel string
28-
runFunc func(context.Context, core.QueuedMessage) error
29-
logger queue.Logger
30-
disableConsumer bool
24+
maxInFlight int
25+
addr string
26+
topic string
27+
channel string
28+
runFunc func(context.Context, core.QueuedMessage) error
29+
logger queue.Logger
3130
}
3231

3332
// WithAddr setup the addr of NSQ
@@ -72,13 +71,6 @@ func WithLogger(l queue.Logger) Option {
7271
})
7372
}
7473

75-
// WithDisableConsumer disable consumer
76-
func WithDisableConsumer() Option {
77-
return OptionFunc(func(o *Options) {
78-
o.disableConsumer = true
79-
})
80-
}
81-
8274
func newOptions(opts ...Option) Options {
8375
defaultOpts := Options{
8476
addr: "127.0.0.1:4150",

0 commit comments

Comments
 (0)