Skip to content

Commit b271a21

Browse files
authored
feat: options added to configure no of workers (#262) (#263)
1 parent b42314b commit b271a21

File tree

8 files changed

+18
-4
lines changed

8 files changed

+18
-4
lines changed

cmd/config.cluster.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pubsub:
5555
task_queue:
5656
mode: remote # local or remote
5757
max_outstanding_messages_per_queue: 1000
58+
no_of_workers_per_queue: 2 # this is lightweight goroutine based worker, so you can increase it as per your requirement
5859
amqp: # all the info should be filled if mode is remote
5960
protocol: amqp # amqp or amqps
6061
host: localhost

cmd/config.standalone.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pubsub:
5555
task_queue:
5656
mode: local # local or remote
5757
max_outstanding_messages_per_queue: 1000
58+
no_of_workers_per_queue: 1 # this is lightweight goroutine based worker, so you can increase it as per your requirement
5859
amqp: # all the info should be filled if mode is remote
5960
protocol: amqp # amqp or amqps
6061
host: localhost

swiftwave_service/core/load_service_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (manager *ServiceManager) Load(config system_config.Config) {
9898
Type: task_queue.Local,
9999
Mode: task_queue.Both, // TODO: option to configure this
100100
MaxMessagesPerQueue: config.TaskQueueConfig.MaxOutstandingMessagesPerQueue,
101+
NoOfWorkersPerQueue: config.TaskQueueConfig.NoOfWorkersPerQueue,
101102
})
102103
if err != nil {
103104
panic(err)
@@ -107,6 +108,7 @@ func (manager *ServiceManager) Load(config system_config.Config) {
107108
taskQueueClient, err := task_queue.NewClient(task_queue.Options{
108109
Type: task_queue.Remote,
109110
Mode: task_queue.Both, // TODO: option to configure this
111+
NoOfWorkersPerQueue: config.TaskQueueConfig.NoOfWorkersPerQueue,
110112
MaxMessagesPerQueue: config.TaskQueueConfig.MaxOutstandingMessagesPerQueue,
111113
AMQPUri: config.TaskQueueConfig.AMQPConfig.URI(),
112114
AMQPVhost: config.TaskQueueConfig.AMQPConfig.VHost,

system_config/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type PubSubConfig struct {
7272
type TaskQueueConfig struct {
7373
Mode TaskQueueMode `yaml:"mode"`
7474
MaxOutstandingMessagesPerQueue int `yaml:"max_outstanding_messages_per_queue"`
75+
NoOfWorkersPerQueue int `yaml:"no_of_workers_per_queue"`
7576
AMQPConfig AMQPConfig `yaml:"amqp"`
7677
}
7778

task_queue/init.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func createLocalTaskQueueClient(options Options) (Client, error) {
3737
queueToChannelMapping: channelsMapping,
3838
operationMode: options.Mode,
3939
maxMessagesPerQueue: options.MaxMessagesPerQueue,
40+
NoOfWorkersPerQueue: options.NoOfWorkersPerQueue,
4041
consumersWaitGroup: &sync.WaitGroup{},
4142
}, nil
4243
}
@@ -56,6 +57,7 @@ func createRemoteTaskQueueClient(options Options) (Client, error) {
5657

5758
return &remoteTaskQueue{
5859
mutexQueueToFunctionMapping: mutex,
60+
NoOfWorkersPerQueue: options.NoOfWorkersPerQueue,
5961
queueToFunctionMapping: functionsMapping,
6062
operationMode: options.Mode,
6163
amqpURI: options.AMQPUri,

task_queue/local_task_queue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ func (l *localTaskQueue) StartConsumers(nowait bool) error {
9090

9191
// start consumers
9292
for _, queueName := range queueNames {
93-
wg.Add(1)
94-
go l.listenForTasks(queueName, wg)
93+
for i := 1; i <= l.NoOfWorkersPerQueue; i++ {
94+
wg.Add(1)
95+
go l.listenForTasks(queueName, wg)
96+
}
9597
}
9698

9799
if !nowait {

task_queue/remote_task_queue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,10 @@ func (r *remoteTaskQueue) StartConsumers(nowait bool) error {
124124

125125
// start consumers
126126
for _, queueName := range queueNames {
127-
wg.Add(1)
128-
go r.listenForTasks(queueName, wg)
127+
for i := 1; i <= r.NoOfWorkersPerQueue; i++ {
128+
wg.Add(1)
129+
go r.listenForTasks(queueName, wg)
130+
}
129131
}
130132

131133
if !nowait {

task_queue/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type localTaskQueue struct {
2727
queueToChannelMapping map[string]chan ArgumentType
2828
operationMode Mode
2929
maxMessagesPerQueue int
30+
NoOfWorkersPerQueue int
3031
consumersWaitGroup *sync.WaitGroup
3132
}
3233

@@ -38,6 +39,7 @@ type remoteTaskQueue struct {
3839
amqpClientName string
3940
operationMode Mode
4041
consumersWaitGroup *sync.WaitGroup
42+
NoOfWorkersPerQueue int
4143
// internal use
4244
amqpConnection *amqp.Connection
4345
amqpChannel *amqp.Channel
@@ -69,6 +71,7 @@ type Options struct {
6971
Type ServiceType
7072
Mode Mode
7173
MaxMessagesPerQueue int // only applicable for local task queue
74+
NoOfWorkersPerQueue int
7275
// Extra options for remote task queue
7376
AMQPUri string
7477
AMQPVhost string

0 commit comments

Comments
 (0)